You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ma...@apache.org on 2019/08/12 01:55:52 UTC
[servicecomb-service-center] branch master updated: [SCB-1429]
Abstract storage operation
This is an automated email from the ASF dual-hosted git repository.
mabin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git
The following commit(s) were added to refs/heads/master by this push:
new 78c431e [SCB-1429] Abstract storage operation
new 606c91c Merge pull request #571 from ChinX/syncer
78c431e is described below
commit 78c431ee0b3c19338bf370fb426e26620ce0361d
Author: chinx <c5...@126.com>
AuthorDate: Fri Aug 9 15:18:09 2019 +0800
[SCB-1429] Abstract storage operation
---
syncer/servicecenter/storage/operation.go | 72 ++++++++++++++++++++++++++++
syncer/servicecenter/storage/storage.go | 78 +++++++++++++++----------------
2 files changed, 110 insertions(+), 40 deletions(-)
diff --git a/syncer/servicecenter/storage/operation.go b/syncer/servicecenter/storage/operation.go
new file mode 100644
index 0000000..f0325a8
--- /dev/null
+++ b/syncer/servicecenter/storage/operation.go
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package storage
+
+import (
+ "github.com/apache/servicecomb-service-center/pkg/util"
+ "github.com/coreos/etcd/clientv3"
+)
+
+var (
+ // mappingsKey the key of instances mapping in etcd
+ mappingsKey = "/syncer/v1/mappings"
+ // servicesKey the key of service in etcd
+ servicesKey = "/syncer/v1/services"
+ // instancesKey the key of instance in etcd
+ instancesKey = "/syncer/v1/instances"
+)
+
+func putServiceOp(serviceId string, data []byte) clientv3.Op {
+ return clientv3.OpPut(servicesKey+"/"+serviceId, util.BytesToStringWithNoCopy(data))
+}
+
+func getServicesOp() clientv3.Op {
+ return clientv3.OpGet(servicesKey, clientv3.WithPrefix())
+}
+
+func deleteServiceOp(serviceId string) clientv3.Op {
+ return clientv3.OpDelete(servicesKey + "/" + serviceId)
+}
+
+func putInstanceOp(instanceID string, data []byte) clientv3.Op {
+ return clientv3.OpPut(instancesKey+"/"+instanceID, util.BytesToStringWithNoCopy(data))
+}
+
+func getInstancesOp() clientv3.Op {
+ return clientv3.OpGet(instancesKey, clientv3.WithPrefix())
+}
+
+func deleteInstanceOp(instanceID string) clientv3.Op {
+ return clientv3.OpDelete(instancesKey + "/" + instanceID)
+}
+
+func putMappingOp(cluster, mappingID string, data []byte) clientv3.Op {
+ return clientv3.OpPut(mappingsKey+"/"+cluster+"/"+mappingID, util.BytesToStringWithNoCopy(data))
+}
+
+func getClusterMappingsOp(cluster string) clientv3.Op {
+ return clientv3.OpGet(mappingsKey+"/"+cluster, clientv3.WithPrefix())
+}
+
+func getAllMappingsOp() clientv3.Op {
+ return clientv3.OpGet(mappingsKey, clientv3.WithPrefix())
+}
+
+func delMappingOp(cluster, mappingID string) clientv3.Op {
+ return clientv3.OpDelete(mappingsKey + "/" + cluster + "/" + mappingID)
+}
\ No newline at end of file
diff --git a/syncer/servicecenter/storage/storage.go b/syncer/servicecenter/storage/storage.go
index c6abf44..14bf91b 100644
--- a/syncer/servicecenter/storage/storage.go
+++ b/syncer/servicecenter/storage/storage.go
@@ -22,21 +22,11 @@ import (
"sync"
"github.com/apache/servicecomb-service-center/pkg/log"
- "github.com/apache/servicecomb-service-center/pkg/util"
pb "github.com/apache/servicecomb-service-center/syncer/proto"
"github.com/coreos/etcd/clientv3"
"github.com/gogo/protobuf/proto"
)
-var (
- // mappingsKey the key of instances mapping in etcd
- mappingsKey = "/syncer/v1/mappings"
- // servicesKey the key of service in etcd
- servicesKey = "/syncer/v1/services"
- // instancesKey the key of instance in etcd
- instancesKey = "/syncer/v1/instances"
-)
-
type Storage interface {
GetData() (data *pb.SyncData)
UpdateData(data *pb.SyncData)
@@ -60,15 +50,21 @@ func NewStorage(engine clientv3.KV) Storage {
return storage
}
-// getPrefixKey Get data from etcd based on the prefix key
-func (s *storage) getPrefixKey(prefix string, handler func(key, val []byte) (next bool)) {
- resp, err := s.engine.Get(context.Background(), prefix, clientv3.WithPrefix())
+// getValue Get value from etcd by key
+func (s *storage) getValue(opt clientv3.Op, handler func(key, val []byte) (next bool)) {
+ resp, err := s.engine.Do(context.Background(), opt)
if err != nil {
- log.Errorf(err, "Get mapping from etcd failed: %s", err)
+ log.Errorf(err, "Do etcd operation failed: %s", err)
return
}
- for _, kv := range resp.Kvs {
+ getResp := resp.Get()
+ if getResp == nil {
+ log.Error("Data from etcd is empty", nil)
+ return
+ }
+
+ for _, kv := range resp.Get().Kvs {
if !handler(kv.Key, kv.Value) {
break
}
@@ -132,8 +128,9 @@ next:
continue next
}
}
- key := mappingsKey + "/" + entry.ClusterName + "/" + entry.OrgInstanceID
- if _, err := s.engine.Delete(context.Background(), key); err != nil {
+
+ delOp := delMappingOp(entry.ClusterName, entry.OrgInstanceID)
+ if _, err := s.engine.Do(context.Background(),delOp); err != nil {
log.Errorf(err, "Delete instance clusterName=%s instanceID=%s failed", entry.ClusterName, entry.OrgInstanceID)
}
@@ -144,13 +141,14 @@ next:
// UpdateServices Update services to storage
func (s *storage) UpdateServices(services []*pb.SyncService) {
for _, val := range services {
- key := servicesKey + "/" + val.ServiceId
data, err := proto.Marshal(val)
if err != nil {
log.Errorf(err, "Proto marshal failed: %s", err)
continue
}
- _, err = s.engine.Put(context.Background(), key, util.BytesToStringWithNoCopy(data))
+
+ updateOp := putServiceOp(val.ServiceId, data)
+ _, err = s.engine.Do(context.Background(), updateOp)
if err != nil {
log.Errorf(err, "Save service to etcd failed: %s", err)
}
@@ -160,7 +158,7 @@ func (s *storage) UpdateServices(services []*pb.SyncService) {
// GetServices Get services from storage
func (s *storage) GetServices() (services []*pb.SyncService) {
services = make([]*pb.SyncService, 0, 10)
- s.getPrefixKey(servicesKey, func(key, val []byte) (next bool) {
+ s.getValue(getServicesOp(), func(key, val []byte) (next bool) {
next = true
item := &pb.SyncService{}
if err := proto.Unmarshal(val, item); err != nil {
@@ -182,8 +180,8 @@ func (s *storage) DeleteServices(services []*pb.SyncService) {
// DeleteServices Delete services from storage
func (s *storage) deleteService(serviceId string) {
- key := servicesKey + "/" + serviceId
- _, err := s.engine.Delete(context.Background(), key)
+ delOp := deleteServiceOp(serviceId)
+ _, err := s.engine.Do(context.Background(), delOp)
if err != nil {
log.Errorf(err, "Delete service from etcd failed: %s", err)
}
@@ -192,13 +190,14 @@ func (s *storage) deleteService(serviceId string) {
// UpdateInstances Update instances to storage
func (s *storage) UpdateInstances(instances []*pb.SyncInstance) {
for _, val := range instances {
- key := instancesKey + "/" + val.InstanceId
data, err := proto.Marshal(val)
if err != nil {
log.Errorf(err, "Proto marshal failed: %s", err)
continue
}
- _, err = s.engine.Put(context.Background(), key, util.BytesToStringWithNoCopy(data))
+
+ updateOp := putInstanceOp(val.InstanceId, data)
+ _, err = s.engine.Do(context.Background(), updateOp)
if err != nil {
log.Errorf(err, "Save instance to etcd failed: %s", err)
}
@@ -208,7 +207,7 @@ func (s *storage) UpdateInstances(instances []*pb.SyncInstance) {
// GetInstances Get instances from storage
func (s *storage) GetInstances() (instances []*pb.SyncInstance) {
instances = make([]*pb.SyncInstance, 0, 10)
- s.getPrefixKey(instancesKey, func(key, val []byte) (next bool) {
+ s.getValue(getInstancesOp(), func(key, val []byte) (next bool) {
next = true
item := &pb.SyncInstance{}
if err := proto.Unmarshal(val, item); err != nil {
@@ -229,8 +228,8 @@ func (s *storage) DeleteInstances(instances []*pb.SyncInstance) {
}
func (s *storage) deleteInstance(instanceID string) {
- key := instancesKey + "/" + instanceID
- _, err := s.engine.Delete(context.Background(), key)
+ delOp := deleteInstanceOp(instanceID)
+ _, err := s.engine.Do(context.Background(), delOp)
if err != nil {
log.Errorf(err, "Delete instance from etcd failed: %s", err)
}
@@ -240,13 +239,14 @@ func (s *storage) deleteInstance(instanceID string) {
func (s *storage) UpdateMapByCluster(clusterName string, mapping pb.SyncMapping) {
newMaps := make(pb.SyncMapping, 0, len(mapping))
for _, val := range mapping {
- key := mappingsKey + "/" + clusterName + "/" + val.OrgInstanceID
data, err := proto.Marshal(val)
if err != nil {
log.Errorf(err, "Proto marshal failed: %s", err)
continue
}
- _, err = s.engine.Put(context.Background(), key, util.BytesToStringWithNoCopy(data))
+
+ putOp := putMappingOp(clusterName, val.OrgInstanceID, data)
+ _, err = s.engine.Do(context.Background(), putOp)
if err != nil {
log.Errorf(err, "Save mapping to etcd failed: %s", err)
}
@@ -257,19 +257,17 @@ func (s *storage) UpdateMapByCluster(clusterName string, mapping pb.SyncMapping)
// GetMapByCluster get map by clusterName of other cluster
func (s *storage) GetMapByCluster(clusterName string) (mapping pb.SyncMapping) {
- maps := make(pb.SyncMapping, 0, 10)
- s.getPrefixKey(mappingsKey+"/"+clusterName, func(key, val []byte) (next bool) {
+ s.getValue(getClusterMappingsOp(clusterName), func(key, val []byte) (next bool) {
next = true
item := &pb.MappingEntry{}
if err := proto.Unmarshal(val, item); err != nil {
log.Errorf(err, "Proto unmarshal '%s' failed: %s", val, err)
return
}
-
- maps = append(maps, item)
+ mapping = append(mapping, item)
return
})
- return maps
+ return
}
// UpdateMaps update all maps to etcd
@@ -277,13 +275,14 @@ func (s *storage) UpdateMaps(maps pb.SyncMapping) {
srcMaps := s.GetMaps()
mappings := make(pb.SyncMapping, 0, len(maps))
for _, val := range maps {
- key := mappingsKey + "/" + val.ClusterName + "/" + val.OrgInstanceID
data, err := proto.Marshal(val)
if err != nil {
log.Errorf(err, "Proto marshal failed: %s", err)
continue
}
- _, err = s.engine.Put(context.Background(), key, util.BytesToStringWithNoCopy(data))
+
+ putOp := putMappingOp(val.ClusterName, val.OrgInstanceID, data)
+ _, err = s.engine.Do(context.Background(), putOp)
if err != nil {
log.Errorf(err, "Save mapping to etcd failed: %s", err)
continue
@@ -295,16 +294,15 @@ func (s *storage) UpdateMaps(maps pb.SyncMapping) {
// GetMaps Get maps from storage
func (s *storage) GetMaps() (mapping pb.SyncMapping) {
- maps := make(pb.SyncMapping, 0, 10)
- s.getPrefixKey(mappingsKey, func(key, val []byte) (next bool) {
+ s.getValue(getAllMappingsOp(), func(key, val []byte) (next bool) {
next = true
item := &pb.MappingEntry{}
if err := proto.Unmarshal(val, item); err != nil {
log.Errorf(err, "Proto unmarshal '%s' failed: %s", val, err)
return
}
- maps = append(maps, item)
+ mapping = append(mapping, item)
return
})
- return maps
+ return
}