You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ma...@apache.org on 2019/08/12 01:55:52 UTC

[servicecomb-service-center] branch master updated: [SCB-1429] Abstract storage operation

This is an automated email from the ASF dual-hosted git repository.

mabin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git


The following commit(s) were added to refs/heads/master by this push:
     new 78c431e  [SCB-1429] Abstract storage operation
     new 606c91c  Merge pull request #571 from ChinX/syncer
78c431e is described below

commit 78c431ee0b3c19338bf370fb426e26620ce0361d
Author: chinx <c5...@126.com>
AuthorDate: Fri Aug 9 15:18:09 2019 +0800

    [SCB-1429] Abstract storage operation
---
 syncer/servicecenter/storage/operation.go | 72 ++++++++++++++++++++++++++++
 syncer/servicecenter/storage/storage.go   | 78 +++++++++++++++----------------
 2 files changed, 110 insertions(+), 40 deletions(-)

diff --git a/syncer/servicecenter/storage/operation.go b/syncer/servicecenter/storage/operation.go
new file mode 100644
index 0000000..f0325a8
--- /dev/null
+++ b/syncer/servicecenter/storage/operation.go
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package storage
+
+import (
+	"github.com/apache/servicecomb-service-center/pkg/util"
+	"github.com/coreos/etcd/clientv3"
+)
+
+var (
+	// mappingsKey the key of instances mapping in etcd
+	mappingsKey = "/syncer/v1/mappings"
+	// servicesKey the key of service in etcd
+	servicesKey = "/syncer/v1/services"
+	// instancesKey the key of instance in etcd
+	instancesKey = "/syncer/v1/instances"
+)
+
+func putServiceOp(serviceId string, data []byte) clientv3.Op {
+	return clientv3.OpPut(servicesKey+"/"+serviceId, util.BytesToStringWithNoCopy(data))
+}
+
+func getServicesOp() clientv3.Op {
+	return clientv3.OpGet(servicesKey, clientv3.WithPrefix())
+}
+
+func deleteServiceOp(serviceId string) clientv3.Op {
+	return clientv3.OpDelete(servicesKey + "/" + serviceId)
+}
+
+func putInstanceOp(instanceID string, data []byte) clientv3.Op {
+	return clientv3.OpPut(instancesKey+"/"+instanceID, util.BytesToStringWithNoCopy(data))
+}
+
+func getInstancesOp() clientv3.Op {
+	return clientv3.OpGet(instancesKey, clientv3.WithPrefix())
+}
+
+func deleteInstanceOp(instanceID string) clientv3.Op {
+	return clientv3.OpDelete(instancesKey + "/" + instanceID)
+}
+
+func putMappingOp(cluster, mappingID string, data []byte) clientv3.Op {
+	return clientv3.OpPut(mappingsKey+"/"+cluster+"/"+mappingID, util.BytesToStringWithNoCopy(data))
+}
+
+func getClusterMappingsOp(cluster string) clientv3.Op {
+	return clientv3.OpGet(mappingsKey+"/"+cluster, clientv3.WithPrefix())
+}
+
+func getAllMappingsOp() clientv3.Op {
+	return clientv3.OpGet(mappingsKey, clientv3.WithPrefix())
+}
+
+func delMappingOp(cluster, mappingID string) clientv3.Op {
+	return clientv3.OpDelete(mappingsKey + "/" + cluster + "/" + mappingID)
+}
\ No newline at end of file
diff --git a/syncer/servicecenter/storage/storage.go b/syncer/servicecenter/storage/storage.go
index c6abf44..14bf91b 100644
--- a/syncer/servicecenter/storage/storage.go
+++ b/syncer/servicecenter/storage/storage.go
@@ -22,21 +22,11 @@ import (
 	"sync"
 
 	"github.com/apache/servicecomb-service-center/pkg/log"
-	"github.com/apache/servicecomb-service-center/pkg/util"
 	pb "github.com/apache/servicecomb-service-center/syncer/proto"
 	"github.com/coreos/etcd/clientv3"
 	"github.com/gogo/protobuf/proto"
 )
 
-var (
-	// mappingsKey the key of instances mapping in etcd
-	mappingsKey = "/syncer/v1/mappings"
-	// servicesKey the key of service in etcd
-	servicesKey = "/syncer/v1/services"
-	// instancesKey the key of instance in etcd
-	instancesKey = "/syncer/v1/instances"
-)
-
 type Storage interface {
 	GetData() (data *pb.SyncData)
 	UpdateData(data *pb.SyncData)
@@ -60,15 +50,21 @@ func NewStorage(engine clientv3.KV) Storage {
 	return storage
 }
 
-// getPrefixKey Get data from etcd based on the prefix key
-func (s *storage) getPrefixKey(prefix string, handler func(key, val []byte) (next bool)) {
-	resp, err := s.engine.Get(context.Background(), prefix, clientv3.WithPrefix())
+// getValue Get value from etcd by key
+func (s *storage) getValue(opt clientv3.Op, handler func(key, val []byte) (next bool)) {
+	resp, err := s.engine.Do(context.Background(), opt)
 	if err != nil {
-		log.Errorf(err, "Get mapping from etcd failed: %s", err)
+		log.Errorf(err, "Do etcd operation failed: %s", err)
 		return
 	}
 
-	for _, kv := range resp.Kvs {
+	getResp := resp.Get()
+	if getResp == nil {
+		log.Error("Data from etcd is empty", nil)
+		return
+	}
+
+	for _, kv := range resp.Get().Kvs {
 		if !handler(kv.Key, kv.Value) {
 			break
 		}
@@ -132,8 +128,9 @@ next:
 				continue next
 			}
 		}
-		key := mappingsKey + "/" + entry.ClusterName + "/" + entry.OrgInstanceID
-		if _, err := s.engine.Delete(context.Background(), key); err != nil {
+
+		delOp := delMappingOp(entry.ClusterName, entry.OrgInstanceID)
+		if _, err := s.engine.Do(context.Background(),delOp); err != nil {
 			log.Errorf(err, "Delete instance clusterName=%s instanceID=%s failed", entry.ClusterName, entry.OrgInstanceID)
 		}
 
@@ -144,13 +141,14 @@ next:
 // UpdateServices Update services to storage
 func (s *storage) UpdateServices(services []*pb.SyncService) {
 	for _, val := range services {
-		key := servicesKey + "/" + val.ServiceId
 		data, err := proto.Marshal(val)
 		if err != nil {
 			log.Errorf(err, "Proto marshal failed: %s", err)
 			continue
 		}
-		_, err = s.engine.Put(context.Background(), key, util.BytesToStringWithNoCopy(data))
+
+		updateOp := putServiceOp(val.ServiceId, data)
+		_, err = s.engine.Do(context.Background(), updateOp)
 		if err != nil {
 			log.Errorf(err, "Save service to etcd failed: %s", err)
 		}
@@ -160,7 +158,7 @@ func (s *storage) UpdateServices(services []*pb.SyncService) {
 // GetServices Get services from storage
 func (s *storage) GetServices() (services []*pb.SyncService) {
 	services = make([]*pb.SyncService, 0, 10)
-	s.getPrefixKey(servicesKey, func(key, val []byte) (next bool) {
+	s.getValue(getServicesOp(), func(key, val []byte) (next bool) {
 		next = true
 		item := &pb.SyncService{}
 		if err := proto.Unmarshal(val, item); err != nil {
@@ -182,8 +180,8 @@ func (s *storage) DeleteServices(services []*pb.SyncService) {
 
 // DeleteServices Delete services from storage
 func (s *storage) deleteService(serviceId string) {
-	key := servicesKey + "/" + serviceId
-	_, err := s.engine.Delete(context.Background(), key)
+	delOp := deleteServiceOp(serviceId)
+	_, err := s.engine.Do(context.Background(), delOp)
 	if err != nil {
 		log.Errorf(err, "Delete service from etcd failed: %s", err)
 	}
@@ -192,13 +190,14 @@ func (s *storage) deleteService(serviceId string) {
 // UpdateInstances Update instances to storage
 func (s *storage) UpdateInstances(instances []*pb.SyncInstance) {
 	for _, val := range instances {
-		key := instancesKey + "/" + val.InstanceId
 		data, err := proto.Marshal(val)
 		if err != nil {
 			log.Errorf(err, "Proto marshal failed: %s", err)
 			continue
 		}
-		_, err = s.engine.Put(context.Background(), key, util.BytesToStringWithNoCopy(data))
+
+		updateOp := putInstanceOp(val.InstanceId, data)
+		_, err = s.engine.Do(context.Background(), updateOp)
 		if err != nil {
 			log.Errorf(err, "Save instance to etcd failed: %s", err)
 		}
@@ -208,7 +207,7 @@ func (s *storage) UpdateInstances(instances []*pb.SyncInstance) {
 // GetInstances Get instances from storage
 func (s *storage) GetInstances() (instances []*pb.SyncInstance) {
 	instances = make([]*pb.SyncInstance, 0, 10)
-	s.getPrefixKey(instancesKey, func(key, val []byte) (next bool) {
+	s.getValue(getInstancesOp(), func(key, val []byte) (next bool) {
 		next = true
 		item := &pb.SyncInstance{}
 		if err := proto.Unmarshal(val, item); err != nil {
@@ -229,8 +228,8 @@ func (s *storage) DeleteInstances(instances []*pb.SyncInstance) {
 }
 
 func (s *storage) deleteInstance(instanceID string) {
-	key := instancesKey + "/" + instanceID
-	_, err := s.engine.Delete(context.Background(), key)
+	delOp := deleteInstanceOp(instanceID)
+	_, err := s.engine.Do(context.Background(), delOp)
 	if err != nil {
 		log.Errorf(err, "Delete instance from etcd failed: %s", err)
 	}
@@ -240,13 +239,14 @@ func (s *storage) deleteInstance(instanceID string) {
 func (s *storage) UpdateMapByCluster(clusterName string, mapping pb.SyncMapping) {
 	newMaps := make(pb.SyncMapping, 0, len(mapping))
 	for _, val := range mapping {
-		key := mappingsKey + "/" + clusterName + "/" + val.OrgInstanceID
 		data, err := proto.Marshal(val)
 		if err != nil {
 			log.Errorf(err, "Proto marshal failed: %s", err)
 			continue
 		}
-		_, err = s.engine.Put(context.Background(), key, util.BytesToStringWithNoCopy(data))
+
+		putOp := putMappingOp(clusterName, val.OrgInstanceID, data)
+		_, err = s.engine.Do(context.Background(), putOp)
 		if err != nil {
 			log.Errorf(err, "Save mapping to etcd failed: %s", err)
 		}
@@ -257,19 +257,17 @@ func (s *storage) UpdateMapByCluster(clusterName string, mapping pb.SyncMapping)
 
 // GetMapByCluster get map by clusterName of other cluster
 func (s *storage) GetMapByCluster(clusterName string) (mapping pb.SyncMapping) {
-	maps := make(pb.SyncMapping, 0, 10)
-	s.getPrefixKey(mappingsKey+"/"+clusterName, func(key, val []byte) (next bool) {
+	s.getValue(getClusterMappingsOp(clusterName), func(key, val []byte) (next bool) {
 		next = true
 		item := &pb.MappingEntry{}
 		if err := proto.Unmarshal(val, item); err != nil {
 			log.Errorf(err, "Proto unmarshal '%s' failed: %s", val, err)
 			return
 		}
-
-		maps = append(maps, item)
+		mapping = append(mapping, item)
 		return
 	})
-	return maps
+	return
 }
 
 // UpdateMaps update all maps to etcd
@@ -277,13 +275,14 @@ func (s *storage) UpdateMaps(maps pb.SyncMapping) {
 	srcMaps := s.GetMaps()
 	mappings := make(pb.SyncMapping, 0, len(maps))
 	for _, val := range maps {
-		key := mappingsKey + "/" + val.ClusterName + "/" + val.OrgInstanceID
 		data, err := proto.Marshal(val)
 		if err != nil {
 			log.Errorf(err, "Proto marshal failed: %s", err)
 			continue
 		}
-		_, err = s.engine.Put(context.Background(), key, util.BytesToStringWithNoCopy(data))
+
+		putOp := putMappingOp(val.ClusterName, val.OrgInstanceID, data)
+		_, err = s.engine.Do(context.Background(), putOp)
 		if err != nil {
 			log.Errorf(err, "Save mapping to etcd failed: %s", err)
 			continue
@@ -295,16 +294,15 @@ func (s *storage) UpdateMaps(maps pb.SyncMapping) {
 
 // GetMaps Get maps from storage
 func (s *storage) GetMaps() (mapping pb.SyncMapping) {
-	maps := make(pb.SyncMapping, 0, 10)
-	s.getPrefixKey(mappingsKey, func(key, val []byte) (next bool) {
+	s.getValue(getAllMappingsOp(), func(key, val []byte) (next bool) {
 		next = true
 		item := &pb.MappingEntry{}
 		if err := proto.Unmarshal(val, item); err != nil {
 			log.Errorf(err, "Proto unmarshal '%s' failed: %s", val, err)
 			return
 		}
-		maps = append(maps, item)
+		mapping = append(mapping, item)
 		return
 	})
-	return maps
+	return
 }