You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@dubbo.apache.org by GitBox <gi...@apache.org> on 2020/06/13 15:17:54 UTC
[GitHub] [dubbo-go] flycash opened a new pull request #604: Application-level Registry Model
flycash opened a new pull request #604:
URL: https://github.com/apache/dubbo-go/pull/604
<!-- Thanks for sending a pull request!
-->
**What this PR does**:
Milestone 1.5.0
**Which issue(s) this PR fixes**:
<!--
*Automatically closes linked issue when PR is merged.
Usage: `Fixes #<issue number>`, or `Fixes (paste link of issue)`.
_If PR is about `failing-tests or flakes`, please post the related issues/tests in a comment and do not use `Fixes`_*
-->
Fixes #
**Special notes for your reviewer**:
**Does this PR introduce a user-facing change?**:
<!--
If no, just write "NONE" in the release-note block below.
If yes, a release note is required:
Enter your extended release note in the block below. If the PR requires additional action from users switching to the new release, include the string "action required".
-->
```release-note
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@dubbo.apache.org
For additional commands, e-mail: notifications-help@dubbo.apache.org
[GitHub] [dubbo-go] codecov-commenter commented on pull request #604: Ftr: Application-level Registry Model
Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on pull request #604:
URL: https://github.com/apache/dubbo-go/pull/604#issuecomment-653771940
# [Codecov](https://codecov.io/gh/apache/dubbo-go/pull/604?src=pr&el=h1) Report
> Merging [#604](https://codecov.io/gh/apache/dubbo-go/pull/604?src=pr&el=desc) into [develop](https://codecov.io/gh/apache/dubbo-go/commit/9697b2b588a84c1905292b8d831a4a7871397535&el=desc) will **decrease** coverage by `3.47%`.
> The diff coverage is `47.83%`.
[![Impacted file tree graph](https://codecov.io/gh/apache/dubbo-go/pull/604/graphs/tree.svg?width=650&height=150&src=pr&token=dcPE6RyFAL)](https://codecov.io/gh/apache/dubbo-go/pull/604?src=pr&el=tree)
```diff
@@ Coverage Diff @@
## develop #604 +/- ##
===========================================
- Coverage 67.05% 63.57% -3.48%
===========================================
Files 190 232 +42
Lines 9907 12014 +2107
===========================================
+ Hits 6643 7638 +995
- Misses 2609 3629 +1020
- Partials 655 747 +92
```
| [Impacted Files](https://codecov.io/gh/apache/dubbo-go/pull/604?src=pr&el=tree) | Coverage Δ | |
|---|---|---|
| [cluster/directory/base\_directory.go](https://codecov.io/gh/apache/dubbo-go/pull/604/diff?src=pr&el=tree#diff-Y2x1c3Rlci9kaXJlY3RvcnkvYmFzZV9kaXJlY3RvcnkuZ28=) | `62.50% <ø> (+5.68%)` | :arrow_up: |
| [common/extension/metadata\_report\_factory.go](https://codecov.io/gh/apache/dubbo-go/pull/604/diff?src=pr&el=tree#diff-Y29tbW9uL2V4dGVuc2lvbi9tZXRhZGF0YV9yZXBvcnRfZmFjdG9yeS5nbw==) | `0.00% <0.00%> (ø)` | |
| [common/extension/metadata\_service.go](https://codecov.io/gh/apache/dubbo-go/pull/604/diff?src=pr&el=tree#diff-Y29tbW9uL2V4dGVuc2lvbi9tZXRhZGF0YV9zZXJ2aWNlLmdv) | `0.00% <0.00%> (ø)` | |
| [common/extension/metadata\_service\_proxy\_factory.go](https://codecov.io/gh/apache/dubbo-go/pull/604/diff?src=pr&el=tree#diff-Y29tbW9uL2V4dGVuc2lvbi9tZXRhZGF0YV9zZXJ2aWNlX3Byb3h5X2ZhY3RvcnkuZ28=) | `0.00% <0.00%> (ø)` | |
| [common/extension/service\_discovery.go](https://codecov.io/gh/apache/dubbo-go/pull/604/diff?src=pr&el=tree#diff-Y29tbW9uL2V4dGVuc2lvbi9zZXJ2aWNlX2Rpc2NvdmVyeS5nbw==) | `0.00% <0.00%> (ø)` | |
| [common/extension/service\_instance\_customizer.go](https://codecov.io/gh/apache/dubbo-go/pull/604/diff?src=pr&el=tree#diff-Y29tbW9uL2V4dGVuc2lvbi9zZXJ2aWNlX2luc3RhbmNlX2N1c3RvbWl6ZXIuZ28=) | `0.00% <0.00%> (ø)` | |
| [...mon/extension/service\_instance\_selector\_factory.go](https://codecov.io/gh/apache/dubbo-go/pull/604/diff?src=pr&el=tree#diff-Y29tbW9uL2V4dGVuc2lvbi9zZXJ2aWNlX2luc3RhbmNlX3NlbGVjdG9yX2ZhY3RvcnkuZ28=) | `0.00% <0.00%> (ø)` | |
| [common/extension/service\_name\_mapping.go](https://codecov.io/gh/apache/dubbo-go/pull/604/diff?src=pr&el=tree#diff-Y29tbW9uL2V4dGVuc2lvbi9zZXJ2aWNlX25hbWVfbWFwcGluZy5nbw==) | `0.00% <0.00%> (ø)` | |
| [...ommon/observer/dispatcher/mock\_event\_dispatcher.go](https://codecov.io/gh/apache/dubbo-go/pull/604/diff?src=pr&el=tree#diff-Y29tbW9uL29ic2VydmVyL2Rpc3BhdGNoZXIvbW9ja19ldmVudF9kaXNwYXRjaGVyLmdv) | `0.00% <0.00%> (ø)` | |
| [common/url.go](https://codecov.io/gh/apache/dubbo-go/pull/604/diff?src=pr&el=tree#diff-Y29tbW9uL3VybC5nbw==) | `65.95% <0.00%> (-2.07%)` | :arrow_down: |
| ... and [133 more](https://codecov.io/gh/apache/dubbo-go/pull/604/diff?src=pr&el=tree-more) | |
------
[Continue to review full report at Codecov](https://codecov.io/gh/apache/dubbo-go/pull/604?src=pr&el=continue).
> **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
> `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
> Powered by [Codecov](https://codecov.io/gh/apache/dubbo-go/pull/604?src=pr&el=footer). Last update [9697b2b...8ad299c](https://codecov.io/gh/apache/dubbo-go/pull/604?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@dubbo.apache.org
For additional commands, e-mail: notifications-help@dubbo.apache.org
[GitHub] [dubbo-go] fangyincheng commented on a change in pull request #604: Ftr: Application-level Registry Model
Posted by GitBox <gi...@apache.org>.
fangyincheng commented on a change in pull request #604:
URL: https://github.com/apache/dubbo-go/pull/604#discussion_r447877267
##########
File path: registry/etcdv3/service_discovery.go
##########
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package etcdv3
+
+import (
+ "fmt"
+ "sync"
+ "time"
+)
+
+import (
+ gxset "github.com/dubbogo/gost/container/set"
+ gxpage "github.com/dubbogo/gost/page"
+ "github.com/hashicorp/vault/helper/jsonutil"
+ perrors "github.com/pkg/errors"
+)
+
+import (
+ "github.com/apache/dubbo-go/common/constant"
+ "github.com/apache/dubbo-go/common/extension"
+ "github.com/apache/dubbo-go/common/logger"
+ "github.com/apache/dubbo-go/config"
+ "github.com/apache/dubbo-go/registry"
+ "github.com/apache/dubbo-go/remoting"
+ "github.com/apache/dubbo-go/remoting/etcdv3"
+)
+
+const (
+ ROOT = "/services"
+)
+
+var (
+ initLock sync.Mutex
+)
+
+func init() {
+ extension.SetServiceDiscovery(constant.ETCDV3_KEY, newEtcdV3ServiceDiscovery)
+}
+
+// new etcd service discovery struct
+type etcdV3ServiceDiscovery struct {
+ // descriptor is a short string about the basic information of this instance
+ descriptor string
+ // client is current Etcdv3 client
+ client *etcdv3.Client
+ // serviceInstance is current serviceInstance
+ serviceInstance *registry.ServiceInstance
+ // services is when register or update will add service name
+ services *gxset.HashSet
+ // child listener
+ childListenerMap map[string]*etcdv3.EventListener
+}
+
+// basic information of this instance
+func (e *etcdV3ServiceDiscovery) String() string {
+ return e.descriptor
+}
+
+// Destory service discovery
+func (e *etcdV3ServiceDiscovery) Destroy() error {
+ if e.client != nil {
+ e.client.Close()
+ }
+ return nil
+}
+
+// Register will register an instance of ServiceInstance to registry
+func (e *etcdV3ServiceDiscovery) Register(instance registry.ServiceInstance) error {
+
+ e.serviceInstance = &instance
+
+ path := toPath(instance)
+
+ if nil != e.client {
+ ins, err := jsonutil.EncodeJSON(instance)
+ if err == nil {
+ err = e.client.RegisterTemp(path, string(ins))
+ if err != nil {
+ logger.Errorf("cannot register the instance: %s", string(ins), err)
+ } else {
+ e.services.Add(instance.GetServiceName())
+ }
+ }
+ }
+
+ return nil
+}
+
+// Update will update the data of the instance in registry
+func (e *etcdV3ServiceDiscovery) Update(instance registry.ServiceInstance) error {
+ path := toPath(instance)
+
+ if nil != e.client {
+ ins, err := jsonutil.EncodeJSON(instance)
+ if nil == err {
+ e.client.RegisterTemp(path, string(ins))
+ e.services.Add(instance.GetServiceName())
+ }
+ }
+
+ return nil
+}
+
+// Unregister will unregister this instance from registry
+func (e *etcdV3ServiceDiscovery) Unregister(instance registry.ServiceInstance) error {
+ path := toPath(instance)
+
+ if nil != e.client {
+ err := e.client.Delete(path)
+ e.services.Remove(instance.GetServiceName())
+ e.serviceInstance = nil
+ return err
+ }
+
+ return nil
+}
+
+// ----------------- discovery -------------------
+// GetDefaultPageSize will return the default page size
+func (e *etcdV3ServiceDiscovery) GetDefaultPageSize() int {
+ return registry.DefaultPageSize
+}
+
+// GetServices will return the all service names.
+func (e *etcdV3ServiceDiscovery) GetServices() *gxset.HashSet {
+ return e.services
+}
+
+// GetInstances will return all service instances with serviceName
+func (e *etcdV3ServiceDiscovery) GetInstances(serviceName string) []registry.ServiceInstance {
+
+ if nil != e.client {
+ // get keys and values
+ _, vList, err := e.client.GetChildrenKVList(toParentPath(serviceName))
+ if nil == err {
+ serviceInstances := make([]registry.ServiceInstance, 0, len(vList))
+ for _, v := range vList {
+ instance := ®istry.DefaultServiceInstance{}
+ err = jsonutil.DecodeJSON([]byte(v), &instance)
+ if nil == err {
+ serviceInstances = append(serviceInstances, instance)
+ }
+ }
+ return serviceInstances
+ }
+ perrors.New(fmt.Sprintf("could not getChildrenKVList the err is:%v", err))
Review comment:
`perrors.New` is not assigned to anyone?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@dubbo.apache.org
For additional commands, e-mail: notifications-help@dubbo.apache.org
[GitHub] [dubbo-go] AlexStocks commented on a change in pull request #604: Application-level Registry Model
Posted by GitBox <gi...@apache.org>.
AlexStocks commented on a change in pull request #604:
URL: https://github.com/apache/dubbo-go/pull/604#discussion_r439799565
##########
File path: common/extension/event_dispatcher.go
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package extension
+
+import (
+ "sync"
+
+ "github.com/apache/dubbo-go/common/logger"
+ "github.com/apache/dubbo-go/common/observer"
+)
+
+var (
+ globalEventDispatcher observer.EventDispatcher
+ initEventListeners []func() observer.EventListener
+ initEventOnce sync.Once
+)
+
+var (
+ dispatchers = make(map[string]func() observer.EventDispatcher, 8)
+)
+
+// SetEventDispatcher by name
+func SetEventDispatcher(name string, v func() observer.EventDispatcher) {
+ dispatchers[name] = v
+}
+
+// SetAndInitGlobalDispatcher
+func SetAndInitGlobalDispatcher(name string) {
+ if len(name) == 0 {
+ name = "direct"
+ }
+ if globalEventDispatcher != nil {
+ logger.Warnf("EventDispatcher already init. It will be replaced")
Review comment:
EventDispatcher has already been inited. It will be replaced
##########
File path: common/extension/event_dispatcher.go
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package extension
+
+import (
+ "sync"
+
+ "github.com/apache/dubbo-go/common/logger"
+ "github.com/apache/dubbo-go/common/observer"
+)
+
+var (
+ globalEventDispatcher observer.EventDispatcher
+ initEventListeners []func() observer.EventListener
+ initEventOnce sync.Once
+)
+
+var (
+ dispatchers = make(map[string]func() observer.EventDispatcher, 8)
+)
+
+// SetEventDispatcher by name
+func SetEventDispatcher(name string, v func() observer.EventDispatcher) {
+ dispatchers[name] = v
+}
+
+// SetAndInitGlobalDispatcher
Review comment:
if u do not wanna add comment, just add "nolint" pls.
##########
File path: common/extension/event_dispatcher.go
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package extension
+
+import (
+ "sync"
+
Review comment:
split
##########
File path: common/extension/event_dispatcher.go
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package extension
+
+import (
+ "sync"
+
+ "github.com/apache/dubbo-go/common/logger"
+ "github.com/apache/dubbo-go/common/observer"
+)
+
+var (
+ globalEventDispatcher observer.EventDispatcher
+ initEventListeners []func() observer.EventListener
+ initEventOnce sync.Once
+)
+
+var (
+ dispatchers = make(map[string]func() observer.EventDispatcher, 8)
+)
+
+// SetEventDispatcher by name
+func SetEventDispatcher(name string, v func() observer.EventDispatcher) {
+ dispatchers[name] = v
+}
+
+// SetAndInitGlobalDispatcher
+func SetAndInitGlobalDispatcher(name string) {
+ if len(name) == 0 {
+ name = "direct"
+ }
+ if globalEventDispatcher != nil {
+ logger.Warnf("EventDispatcher already init. It will be replaced")
+ }
+ if dp, ok := dispatchers[name]; !ok || dp == nil {
+ panic("EventDispatcher for " + name + " is not existing, make sure you have import the package.")
+ }
+ globalEventDispatcher = dispatchers[name]()
+}
+
+// GetGlobalDispatcher will init all listener and then return dispatcher
+func GetGlobalDispatcher() observer.EventDispatcher {
+ initEventOnce.Do(func() {
+ // we should delay to add the listeners to avoid some listeners left
Review comment:
if this comment `we should delay to add the listeners to avoid some listeners left` is chinglish, I can not understand what its meaning is.
##########
File path: common/extension/service_instance_customizer.go
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package extension
+
+import (
+ "sort"
+
Review comment:
split.
##########
File path: metadata/service/remote/service.go
##########
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package remote
+
+import (
+ "sync"
+
+ "go.uber.org/atomic"
Review comment:
split it.
##########
File path: registry/event/customizable_service_instance_listener.go
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package event
+
+import (
+ "reflect"
+ "sync"
+
Review comment:
split it.
##########
File path: metadata/service/inmemory/metadata_service_proxy_factory_test.go
##########
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package inmemory
+
+import (
+ "encoding/json"
+ "testing"
+
Review comment:
split it.
##########
File path: registry/servicediscovery/service_discovery_registry_test.go
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package servicediscovery
+
+import (
+ "testing"
+)
+
+var (
+ SERVICE_INTERFACE = "org.apache.dubbo.metadata.MetadataService"
+ GROUP = "dubbo-provider"
+ VERSION = "1.0.0"
+)
+
+func TestServiceDiscoveryRegistry_Register(t *testing.T) {
+ // registryURL,_:=event.NewURL("in-memory://localhost:12345",
Review comment:
what do u want to test?
##########
File path: metadata/service/inmemory/metadata_service_proxy_factory.go
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package inmemory
+
+import (
+ "encoding/json"
+
Review comment:
split it.
##########
File path: registry/event/service_name_mapping_listener.go
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package event
+
+import (
+ "reflect"
+ "sync"
+
Review comment:
split it.
##########
File path: metadata/service/service.go
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package service
+
+import (
+ "sync"
+
Review comment:
split it.
##########
File path: common/extension/metadata_service.go
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package extension
+
+import (
+ "fmt"
+
Review comment:
split.
##########
File path: registry/event/log_event_listener.go
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package event
+
+import (
+ "reflect"
+ "sync"
+
Review comment:
split it.
##########
File path: metadata/service/inmemory/service.go
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package inmemory
+
+import (
+ "sort"
+ "sync"
+
Review comment:
split it.
##########
File path: common/extension/metadata_service_proxy_factory.go
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package extension
+
+import (
+ "fmt"
+
Review comment:
split.
##########
File path: common/observer/listenable.go
##########
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package observer
+
+import (
+ "reflect"
+ "sort"
+ "sync"
+)
+
+// Listenable could add and remove the event listener
+type Listenable interface {
+ AddEventListener(listener EventListener)
+ AddEventListeners(listenersSlice []EventListener)
+ RemoveEventListener(listener EventListener)
+ RemoveEventListeners(listenersSlice []EventListener)
+ GetAllEventListeners() []EventListener
+ RemoveAllEventListeners()
+}
+
+// BaseListenable base listenable
+type BaseListenable struct {
Review comment:
why not use "BaseListener"?
##########
File path: common/observer/listenable.go
##########
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package observer
+
+import (
+ "reflect"
+ "sort"
+ "sync"
+)
+
+// Listenable could add and remove the event listener
+type Listenable interface {
+ AddEventListener(listener EventListener)
+ AddEventListeners(listenersSlice []EventListener)
+ RemoveEventListener(listener EventListener)
+ RemoveEventListeners(listenersSlice []EventListener)
+ GetAllEventListeners() []EventListener
+ RemoveAllEventListeners()
+}
+
+// BaseListenable base listenable
+type BaseListenable struct {
+ Listenable
+ ListenersCache sync.Map
+ Mutex sync.Mutex
+}
+
+// NewBaseListenable a constructor of base listenable
+func NewBaseListenable() Listenable {
+ return &BaseListenable{}
+}
+
+// AddEventListener add event listener
+func (bl *BaseListenable) AddEventListener(listener EventListener) {
+ eventType := listener.GetEventType()
+ if eventType.Kind() == reflect.Ptr {
+ eventType = eventType.Elem()
+ }
+ bl.Mutex.Lock()
+ defer bl.Mutex.Unlock()
+ value, loaded := bl.ListenersCache.LoadOrStore(eventType, make([]EventListener, 0, 8))
Review comment:
ListenersCache: in lock scope, u need not to use sync.Map.
##########
File path: config/remote_config.go
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package config
+
+import (
+ "time"
+
Review comment:
split it.
##########
File path: config/config_loader.go
##########
@@ -233,39 +250,79 @@ func RPCService(service common.RPCService) {
// GetMetricConfig find the MetricConfig
// if it is nil, create a new one
+// we use double-check to reduce race condition
+// In general, it will be locked 0 or 1 time.
+// So you don't need to worry about the race condition
func GetMetricConfig() *MetricConfig {
- if metricConfig == nil {
- metricConfig = &MetricConfig{}
+ if GetBaseConfig().MetricConfig == nil {
+ configAccessMutex.Lock()
+ defer configAccessMutex.Unlock()
+ if GetBaseConfig().MetricConfig == nil {
+ GetBaseConfig().MetricConfig = &MetricConfig{}
+ }
}
- return metricConfig
+ return GetBaseConfig().MetricConfig
}
// GetApplicationConfig find the application config
// if not, we will create one
// Usually applicationConfig will be initialized when system start
+// we use double-check to reduce race condition
+// In general, it will be locked 0 or 1 time.
+// So you don't need to worry about the race condition
func GetApplicationConfig() *ApplicationConfig {
- if applicationConfig == nil {
- applicationConfig = &ApplicationConfig{}
+ if GetBaseConfig().ApplicationConfig == nil {
+ configAccessMutex.Lock()
+ defer configAccessMutex.Unlock()
+ if GetBaseConfig().ApplicationConfig == nil {
+ GetBaseConfig().ApplicationConfig = &ApplicationConfig{}
+ }
}
- return applicationConfig
+ return GetBaseConfig().ApplicationConfig
}
// GetProviderConfig find the provider config
// if not found, create new one
func GetProviderConfig() ProviderConfig {
if providerConfig == nil {
- logger.Warnf("providerConfig is nil!")
- return ProviderConfig{}
+ logger.Warnf("providerConfig is nil! we will try to create one")
+ if providerConfig == nil {
+ return ProviderConfig{}
+ }
}
return *providerConfig
}
// GetConsumerConfig find the consumer config
// if not found, create new one
+// we use double-check to reduce race condition
+// In general, it will be locked 0 or 1 time.
+// So you don't need to worry about the race condition
func GetConsumerConfig() ConsumerConfig {
if consumerConfig == nil {
- logger.Warnf("consumerConfig is nil!")
- return ConsumerConfig{}
+ if consumerConfig == nil {
+ return ConsumerConfig{}
+ }
}
return *consumerConfig
}
+
+func GetBaseConfig() *BaseConfig {
+
+ if baseConfig == nil {
Review comment:
pls delete this if condition clause.
##########
File path: common/extension/event_dispatcher.go
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package extension
+
+import (
+ "sync"
+
+ "github.com/apache/dubbo-go/common/logger"
+ "github.com/apache/dubbo-go/common/observer"
+)
+
+var (
+ globalEventDispatcher observer.EventDispatcher
+ initEventListeners []func() observer.EventListener
+ initEventOnce sync.Once
+)
+
+var (
+ dispatchers = make(map[string]func() observer.EventDispatcher, 8)
+)
+
+// SetEventDispatcher by name
+func SetEventDispatcher(name string, v func() observer.EventDispatcher) {
+ dispatchers[name] = v
+}
+
+// SetAndInitGlobalDispatcher
+func SetAndInitGlobalDispatcher(name string) {
+ if len(name) == 0 {
+ name = "direct"
+ }
+ if globalEventDispatcher != nil {
+ logger.Warnf("EventDispatcher already init. It will be replaced")
+ }
+ if dp, ok := dispatchers[name]; !ok || dp == nil {
+ panic("EventDispatcher for " + name + " is not existing, make sure you have import the package.")
Review comment:
does not exist. Pls make sure you have imported the package '_ github.com/apache/dubbo-go/common/extension'.
##########
File path: metadata/mapping/memory/service_name_mapping.go
##########
@@ -18,13 +18,22 @@
package memory
import (
+ "sync"
+
gxset "github.com/dubbogo/gost/container/set"
+
Review comment:
split it.
##########
File path: config/remote_config_test.go
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package config
+
+import (
+ "testing"
+
Review comment:
split.
##########
File path: metadata/mapping/dynamic/service_name_mapping.go
##########
@@ -19,7 +19,12 @@ package dynamic
import (
"strconv"
+ "sync"
"time"
+
Review comment:
split it.
##########
File path: metadata/mapping/dynamic/service_name_mapping.go
##########
@@ -76,7 +87,13 @@ func (d *DynamicConfigurationServiceNameMapping) buildGroup(serviceInterface str
return defaultGroup + slash + serviceInterface
}
-// NewServiceNameMapping will create an instance of DynamicConfigurationServiceNameMapping
-func NewServiceNameMapping(dc config_center.DynamicConfiguration) metadata.ServiceNameMapping {
- return &DynamicConfigurationServiceNameMapping{dc: dc}
+var serviceNameMappingInstance *DynamicConfigurationServiceNameMapping
Review comment:
pls use vars as follows,
```Go
var (
)
```
##########
File path: metadata/service/inmemory/service_proxy.go
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package inmemory
+
+import (
+ "context"
+ "reflect"
+ "time"
Review comment:
split it.
##########
File path: metadata/service/remote/service_proxy.go
##########
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package remote
+
+import (
+ "strings"
Review comment:
split it.
##########
File path: registry/event/metadata_service_url_params_customizer.go
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package event
+
+import (
+ "encoding/json"
Review comment:
split it.
##########
File path: registry/event/event_publishing_service_discovery.go
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package event
+
+import (
+ gxset "github.com/dubbogo/gost/container/set"
+ gxpage "github.com/dubbogo/gost/page"
+
Review comment:
split it.
##########
File path: registry/event/protocol_ports_metadata_customizer.go
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package event
+
+import (
+ "encoding/json"
+ "strconv"
+
+ "github.com/apache/dubbo-go/common"
+ "github.com/apache/dubbo-go/common/constant"
+ "github.com/apache/dubbo-go/common/extension"
+ "github.com/apache/dubbo-go/common/logger"
+ "github.com/apache/dubbo-go/registry"
+)
+
+func init() {
+ extension.AddCustomizers(&ProtocolPortsMetadataCustomizer{})
+}
+
+// ProtocolPortsMetadataCustomizer will update the endpoints
+type ProtocolPortsMetadataCustomizer struct {
+}
+
+// GetPriority will return 0, which means it will be invoked at the beginning
+func (p *ProtocolPortsMetadataCustomizer) GetPriority() int {
+ return 0
+}
+
+// Customize will
+func (p *ProtocolPortsMetadataCustomizer) Customize(instance registry.ServiceInstance) {
+ metadataService, err := getMetadataService()
+ if err != nil {
+ logger.Errorf("Could not init the MetadataService", err)
+ return
+ }
+
+ // 4 is enough...
+ protocolMap := make(map[string]int, 4)
+
+ list, err := metadataService.GetExportedURLs(constant.ANY_VALUE, constant.ANY_VALUE, constant.ANY_VALUE, constant.ANY_VALUE)
+ if err != nil || len(list) == 0 {
+ logger.Errorf("Could not find exported urls", err)
+ return
+ }
+
+ for _, ui := range list {
+ u, err := common.NewURL(ui.(string))
+ if err != nil || len(u.Protocol) == 0 {
+ logger.Errorf("the url string is invalid: %s", ui.(string), err)
+ continue
+ }
+
+ port, err := strconv.Atoi(u.Port)
+ if err != nil {
+ logger.Errorf("Could not customize the metadata of port. ", err)
+ }
+ protocolMap[u.Protocol] = port
+ }
+
+ instance.GetMetadata()[constant.SERVICE_INSTANCE_ENDPOINTS] = endpointsStr(protocolMap)
+}
+
+func endpointsStr(protocolMap map[string]int) string {
+ if len(protocolMap) == 0 {
+ return ""
+ }
+
+ endpoints := make([]endpoint, 0, len(protocolMap))
+ for k, v := range protocolMap {
+ endpoints = append(endpoints, endpoint{
+ Port: v,
+ Protocol: k,
+ })
+ }
+
+ str, err := json.Marshal(endpoints)
+ if err != nil {
+ logger.Errorf("could not convert the endpoints to json", err)
+ return ""
+ }
+ return string(str)
+}
+
+type endpoint struct {
+ Port int `json:"port"`
Review comment:
pls add `omitempty`
##########
File path: registry/event/service_config_exported_event.go
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package event
+
+import (
+ "time"
+
Review comment:
split it.
##########
File path: registry/event/log_event_listener_test.go
##########
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package event
+
+import (
+ "testing"
+
Review comment:
split it
##########
File path: registry/event/protocol_ports_metadata_customizer.go
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package event
+
+import (
+ "encoding/json"
+ "strconv"
Review comment:
split it.
##########
File path: registry/event/service_revision_customizer.go
##########
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package event
+
+import (
+ "fmt"
+ "hash/crc32"
+ "sort"
Review comment:
split it.
##########
File path: registry/event/event_publishing_service_deiscovery_test.go
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package event
+
+import (
+ "reflect"
+ "testing"
+
Review comment:
split it.
##########
File path: registry/servicediscovery/service_discovery_registry.go
##########
@@ -0,0 +1,693 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package servicediscovery
+
+import (
+ "bytes"
+ "encoding/json"
+ "strconv"
+ "strings"
+ "sync"
+
Review comment:
split it.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@dubbo.apache.org
For additional commands, e-mail: notifications-help@dubbo.apache.org
[GitHub] [dubbo-go] AlexStocks commented on a change in pull request #604: Ftr: Application-level Registry Model
Posted by GitBox <gi...@apache.org>.
AlexStocks commented on a change in pull request #604:
URL: https://github.com/apache/dubbo-go/pull/604#discussion_r452701636
##########
File path: remoting/zookeeper/curator_discovery/service_discovery.go
##########
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package curator_discovery
+
+import (
+ "encoding/json"
+ "path"
+ "strings"
+ "sync"
+
+ "github.com/dubbogo/go-zookeeper/zk"
+)
+
+import (
+ perrors "github.com/pkg/errors"
+)
+
+import (
+ "github.com/apache/dubbo-go/common/constant"
+ "github.com/apache/dubbo-go/common/logger"
+ "github.com/apache/dubbo-go/remoting"
+ "github.com/apache/dubbo-go/remoting/zookeeper"
+)
+
+// Entry contain a service instance
+type Entry struct {
+ sync.Mutex
+ instance *ServiceInstance
+}
+
+// ServiceInstance which define in curator-x-discovery, please refer to
+// https://github.com/apache/curator/blob/master/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceDiscovery.java
+// It's not exactly the same as curator-x-discovery's service discovery
+type ServiceDiscovery struct {
+ client *zookeeper.ZookeeperClient
+ mutex *sync.Mutex
+ basePath string
+ services *sync.Map
+ listener *zookeeper.ZkEventListener
+}
+
+// NewServiceDiscovery the constructor of service discovery
+func NewServiceDiscovery(client *zookeeper.ZookeeperClient, basePath string) *ServiceDiscovery {
+ return &ServiceDiscovery{
+ client: client,
+ mutex: &sync.Mutex{},
+ basePath: basePath,
+ services: &sync.Map{},
+ listener: zookeeper.NewZkEventListener(client),
+ }
+}
+
+// registerService register service to zookeeper
+func (sd *ServiceDiscovery) registerService(instance *ServiceInstance) error {
+ path := sd.pathForInstance(instance.Name, instance.Id)
+ data, err := json.Marshal(instance)
+ if err != nil {
+ return err
+ }
+ err = sd.client.CreateTempWithValue(path, data)
+ if err == zk.ErrNodeExists {
+ _, state, _ := sd.client.GetContent(path)
+ if state != nil {
+ _, err = sd.client.SetContent(path, data, state.Version+1)
+ if err != nil {
+ logger.Debugf("Try to update the node data failed. In most cases, it's not a problem. ")
+ }
+ }
+ return nil
+ }
+ if err != nil {
+ return err
+ }
+ return nil
+}
+
+// RegisterService register service to zookeeper, and ensure cache is consistent with zookeeper
+func (sd *ServiceDiscovery) RegisterService(instance *ServiceInstance) error {
+ value, loaded := sd.services.LoadOrStore(instance.Id, &Entry{})
+ entry, ok := value.(*Entry)
+ if !ok {
+ return perrors.New("[ServiceDiscovery] services value not entry")
+ }
+ entry.Lock()
+ defer entry.Unlock()
+ entry.instance = instance
+ err := sd.registerService(instance)
+ if err != nil {
+ return err
+ }
+ if !loaded {
+ sd.ListenServiceInstanceEvent(instance.Name, instance.Id, sd)
+ }
+ return nil
+}
+
+// UpdateService update service in zookeeper, and ensure cache is consistent with zookeeper
+func (sd *ServiceDiscovery) UpdateService(instance *ServiceInstance) error {
+ value, ok := sd.services.Load(instance.Id)
+ if !ok {
+ return perrors.Errorf("[ServiceDiscovery] Service{%s} not registered", instance.Id)
+ }
+ entry, ok := value.(*Entry)
+ if !ok {
+ return perrors.New("[ServiceDiscovery] services value not entry")
+ }
+ entry.Lock()
+ defer entry.Unlock()
+ entry.instance = instance
+ path := sd.pathForInstance(instance.Name, instance.Id)
+ data, err := json.Marshal(instance)
Review comment:
can u move 'json.Marshal' out of the lock scope?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@dubbo.apache.org
For additional commands, e-mail: notifications-help@dubbo.apache.org
[GitHub] [dubbo-go] pantianying commented on a change in pull request #604: Ftr: Application-level Registry Model
Posted by GitBox <gi...@apache.org>.
pantianying commented on a change in pull request #604:
URL: https://github.com/apache/dubbo-go/pull/604#discussion_r446862957
##########
File path: registry/zookeeper/service_discovery.go
##########
@@ -0,0 +1,350 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package zookeeper
+
+import (
+ "fmt"
+ "net/url"
+ "strconv"
+ "strings"
+ "sync"
+)
+
+import (
+ "github.com/dubbogo/gost/container/set"
+ "github.com/dubbogo/gost/page"
+ perrors "github.com/pkg/errors"
+)
+
+import (
+ "github.com/apache/dubbo-go/common"
+ "github.com/apache/dubbo-go/common/constant"
+ "github.com/apache/dubbo-go/common/extension"
+ "github.com/apache/dubbo-go/common/logger"
+ "github.com/apache/dubbo-go/config"
+ "github.com/apache/dubbo-go/registry"
+ "github.com/apache/dubbo-go/remoting"
+ "github.com/apache/dubbo-go/remoting/zookeeper"
+ "github.com/apache/dubbo-go/remoting/zookeeper/curator_discovery"
+)
+
+const (
+ // RegistryZkClient zk client name
+ ServiceDiscoveryZkClient = "zk service discovery"
+)
+
+var (
+ // 16 would be enough. We won't use concurrentMap because in most cases, there are not race condition
+ instanceMap = make(map[string]registry.ServiceDiscovery, 16)
+ initLock sync.Mutex
+)
+
+// init will put the service discovery into extension
+func init() {
+ extension.SetServiceDiscovery(constant.ZOOKEEPER_KEY, newZookeeperServiceDiscovery)
+}
+
+type zookeeperServiceDiscovery struct {
+ client *zookeeper.ZookeeperClient
+ csd *curator_discovery.ServiceDiscovery
+ listener *zookeeper.ZkEventListener
+ url *common.URL
+ wg sync.WaitGroup
+ cltLock sync.Mutex
+ listenLock sync.Mutex
+ done chan struct{}
+ rootPath string
+ listenNames []string
+}
+
+// newZookeeperServiceDiscovery the constructor of newZookeeperServiceDiscovery
+func newZookeeperServiceDiscovery(name string) (registry.ServiceDiscovery, error) {
+ instance, ok := instanceMap[name]
+ if ok {
+ return instance, nil
+ }
+
+ initLock.Lock()
+ defer initLock.Unlock()
+
+ // double check
+ instance, ok = instanceMap[name]
+ if ok {
+ return instance, nil
+ }
+
+ sdc, ok := config.GetBaseConfig().GetServiceDiscoveries(name)
+ if !ok || len(sdc.RemoteRef) == 0 {
+ return nil, perrors.New("could not init the instance because the config is invalid")
+ }
+ remoteConfig, ok := config.GetBaseConfig().GetRemoteConfig(sdc.RemoteRef)
+ if !ok {
+ return nil, perrors.New("could not find the remote config for name: " + sdc.RemoteRef)
+ }
+ rootPath := remoteConfig.GetParam("rootPath", "/services")
+ url := common.NewURLWithOptions(
+ common.WithParams(make(url.Values)),
+ common.WithPassword(remoteConfig.Password),
+ common.WithUsername(remoteConfig.Username),
+ common.WithParamsValue(constant.REGISTRY_TIMEOUT_KEY, remoteConfig.TimeoutStr))
+ url.Location = remoteConfig.Address
+ zksd := &zookeeperServiceDiscovery{
+ url: url,
+ rootPath: rootPath,
+ }
+ err := zookeeper.ValidateZookeeperClient(zksd, zookeeper.WithZkName(ServiceDiscoveryZkClient))
+ if err != nil {
+ return nil, err
+ }
+ go zookeeper.HandleClientRestart(zksd)
+ zksd.csd = curator_discovery.NewServiceDiscovery(zksd.client, rootPath)
+ return zksd, nil
+}
+
+// nolint
+func (zksd *zookeeperServiceDiscovery) ZkClient() *zookeeper.ZookeeperClient {
+ return zksd.client
+}
+
+// nolint
+func (zksd *zookeeperServiceDiscovery) SetZkClient(client *zookeeper.ZookeeperClient) {
+ zksd.client = client
+}
+
+// nolint
+func (zksd *zookeeperServiceDiscovery) ZkClientLock() *sync.Mutex {
+ return &zksd.cltLock
+}
+
+// nolint
+func (zksd *zookeeperServiceDiscovery) WaitGroup() *sync.WaitGroup {
+ return &zksd.wg
+}
+
+// nolint
+func (zksd *zookeeperServiceDiscovery) Done() chan struct{} {
+ return zksd.done
+}
+
+// RestartCallBack when zookeeper connection reconnect this function will be invoked.
+// try to re-register service, and listen services
+func (zksd *zookeeperServiceDiscovery) RestartCallBack() bool {
+ zksd.csd.ReRegisterServices()
+ zksd.listenLock.Lock()
+ defer zksd.listenLock.Unlock()
+ for _, name := range zksd.listenNames {
+ zksd.csd.ListenServiceEvent(name, zksd)
+ }
+ return true
+}
+
+// nolint
+func (zksd *zookeeperServiceDiscovery) GetUrl() common.URL {
+ return *zksd.url
+}
+
+// nolint
+func (zksd *zookeeperServiceDiscovery) String() string {
+ return fmt.Sprintf("zookeeper-service-discovery[%s]", zksd.url)
+}
+
+// Close client be closed
+func (zksd *zookeeperServiceDiscovery) Destroy() error {
+ zksd.client.Close()
+ return nil
+}
+
+// Register will register service in zookeeper, instance convert to curator's service instance
+// which define in curator-x-discovery.
+func (zksd *zookeeperServiceDiscovery) Register(instance registry.ServiceInstance) error {
+ cris := zksd.toCuratorInstance(instance)
+ return zksd.csd.RegisterService(cris)
+}
+
+// Register will update service in zookeeper, instance convert to curator's service instance
+// which define in curator-x-discovery, please refer to https://github.com/apache/curator.
+func (zksd *zookeeperServiceDiscovery) Update(instance registry.ServiceInstance) error {
+ cris := zksd.toCuratorInstance(instance)
+ return zksd.csd.UpdateService(cris)
+}
+
+// Unregister will unregister the instance in zookeeper
+func (zksd *zookeeperServiceDiscovery) Unregister(instance registry.ServiceInstance) error {
+ cris := zksd.toCuratorInstance(instance)
+ return zksd.csd.UnregisterService(cris)
+}
+
+// GetDefaultPageSize will return the constant registry.DefaultPageSize
+func (zksd *zookeeperServiceDiscovery) GetDefaultPageSize() int {
+ return registry.DefaultPageSize
+}
+
+// GetServices will return the all services in zookeeper
+func (zksd *zookeeperServiceDiscovery) GetServices() *gxset.HashSet {
+ services, err := zksd.csd.QueryForNames()
+ res := gxset.NewSet()
+ if err != nil {
+ logger.Errorf("[zkServiceDiscovery] Could not query the services: %v", err)
+ return res
+ }
+ for _, service := range services {
+ res.Add(service)
+ }
+ return res
+}
+
+// GetInstances will return the instances in a service
+func (zksd *zookeeperServiceDiscovery) GetInstances(serviceName string) []registry.ServiceInstance {
+ criss, err := zksd.csd.QueryForInstances(serviceName)
+ if err != nil {
+ logger.Errorf("[zkServiceDiscovery] Could not query the instances for service{%s}, error = err{%v} ",
+ serviceName, err)
+ return make([]registry.ServiceInstance, 0, 0)
+ }
+ iss := make([]registry.ServiceInstance, 0, len(criss))
+ for _, cris := range criss {
+ iss = append(iss, zksd.toZookeeperInstance(cris))
+ }
+ return iss
+}
+
+// GetInstancesByPage will return the instances
+func (zksd *zookeeperServiceDiscovery) GetInstancesByPage(serviceName string, offset int, pageSize int) gxpage.Pager {
+ all := zksd.GetInstances(serviceName)
+ res := make([]interface{}, 0, pageSize)
+ // could not use res = all[a:b] here because the res should be []interface{}, not []ServiceInstance
+ for i := offset; i < len(all) && i < offset+pageSize; i++ {
+ res = append(res, all[i])
+ }
+ return gxpage.New(offset, pageSize, res, len(all))
+}
+
+// GetHealthyInstancesByPage will return the instance
+// In zookeeper, all service instance's is healthy.
+// However, the healthy parameter in this method maybe false. So we can not use that API.
+// Thus, we must query all instances and then do filter
+func (zksd *zookeeperServiceDiscovery) GetHealthyInstancesByPage(serviceName string, offset int, pageSize int, healthy bool) gxpage.Pager {
+ all := zksd.GetInstances(serviceName)
+ res := make([]interface{}, 0, pageSize)
+ // could not use res = all[a:b] here because the res should be []interface{}, not []ServiceInstance
+ var (
+ i = offset
+ count = 0
+ )
+ for i < len(all) && count < pageSize {
+ ins := all[i]
+ if ins.IsHealthy() == healthy {
+ res = append(res, all[i])
+ count++
+ }
+ i++
+ }
+ return gxpage.New(offset, pageSize, res, len(all))
+}
+
+// GetRequestInstances will return the instances
+func (zksd *zookeeperServiceDiscovery) GetRequestInstances(serviceNames []string, offset int, requestedSize int) map[string]gxpage.Pager {
+ res := make(map[string]gxpage.Pager, len(serviceNames))
+ for _, name := range serviceNames {
+ res[name] = zksd.GetInstancesByPage(name, offset, requestedSize)
+ }
+ return res
+}
+
+// AddListener ListenServiceEvent will add a data listener in service
+func (zksd *zookeeperServiceDiscovery) AddListener(listener *registry.ServiceInstancesChangedListener) error {
+ zksd.listenLock.Lock()
+ defer zksd.listenLock.Unlock()
+ zksd.listenNames = append(zksd.listenNames, listener.ServiceName)
+ zksd.csd.ListenServiceEvent(listener.ServiceName, zksd)
+ return nil
+}
+
+func (zksd *zookeeperServiceDiscovery) DispatchEventByServiceName(serviceName string) error {
+ return zksd.DispatchEventForInstances(serviceName, zksd.GetInstances(serviceName))
+}
+
+// DispatchEventForInstances dispatch ServiceInstancesChangedEvent
+func (zksd *zookeeperServiceDiscovery) DispatchEventForInstances(serviceName string, instances []registry.ServiceInstance) error {
+ return zksd.DispatchEvent(registry.NewServiceInstancesChangedEvent(serviceName, instances))
+}
+
+// nolint
+func (zksd *zookeeperServiceDiscovery) DispatchEvent(event *registry.ServiceInstancesChangedEvent) error {
+ extension.GetGlobalDispatcher().Dispatch(event)
+ return nil
+}
+
+// DataChange implement DataListener's DataChange function
+// to resolve event to do DispatchEventByServiceName
+func (zksd *zookeeperServiceDiscovery) DataChange(eventType remoting.Event) bool {
+ path := strings.TrimPrefix(eventType.Path, zksd.rootPath)
+ path = strings.TrimPrefix(path, constant.PATH_SEPARATOR)
+ // get service name in zk path
+ serviceName := strings.Split(path, constant.PATH_SEPARATOR)[0]
+ err := zksd.DispatchEventByServiceName(serviceName)
+ if err != nil {
+ logger.Errorf("[zkServiceDiscovery] DispatchEventByServiceName{%s} error = err{%v}", serviceName, err)
Review comment:
here should return false?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@dubbo.apache.org
For additional commands, e-mail: notifications-help@dubbo.apache.org
[GitHub] [dubbo-go] AlexStocks commented on a change in pull request #604: Ftr: Application-level Registry Model
Posted by GitBox <gi...@apache.org>.
AlexStocks commented on a change in pull request #604:
URL: https://github.com/apache/dubbo-go/pull/604#discussion_r452895965
##########
File path: registry/servicediscovery/service_discovery_registry.go
##########
@@ -0,0 +1,713 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package servicediscovery
+
+import (
+ "bytes"
+ "encoding/json"
+ "strconv"
+ "strings"
+ "sync"
+)
+
+import (
+ cm "github.com/Workiva/go-datastructures/common"
+ gxset "github.com/dubbogo/gost/container/set"
+ gxnet "github.com/dubbogo/gost/net"
+ perrors "github.com/pkg/errors"
+ "go.uber.org/atomic"
+)
+
+import (
+ "github.com/apache/dubbo-go/common"
+ "github.com/apache/dubbo-go/common/constant"
+ "github.com/apache/dubbo-go/common/extension"
+ "github.com/apache/dubbo-go/common/logger"
+ "github.com/apache/dubbo-go/common/observer"
+ "github.com/apache/dubbo-go/config"
+ "github.com/apache/dubbo-go/metadata/mapping"
+ "github.com/apache/dubbo-go/metadata/service"
+ "github.com/apache/dubbo-go/metadata/service/exporter/configurable"
+ "github.com/apache/dubbo-go/registry"
+ "github.com/apache/dubbo-go/registry/event"
+ "github.com/apache/dubbo-go/registry/servicediscovery/synthesizer"
+ "github.com/apache/dubbo-go/remoting"
+)
+
+const (
+ protocolName = "service-discovery"
+)
+
+func init() {
+ extension.SetRegistry(protocolName, newServiceDiscoveryRegistry)
+}
+
+// serviceDiscoveryRegistry is the implementation of application-level registry.
+// It's completely different from other registry implementations
+// This implementation is based on ServiceDiscovery abstraction and ServiceNameMapping
+// In order to keep compatible with interface-level registry,
+// this implementation is
+type serviceDiscoveryRegistry struct {
+ lock sync.RWMutex
+ url *common.URL
+ serviceDiscovery registry.ServiceDiscovery
+ subscribedServices *gxset.HashSet
+ serviceNameMapping mapping.ServiceNameMapping
+ metaDataService service.MetadataService
+ registeredListeners *gxset.HashSet
+ subscribedURLsSynthesizers []synthesizer.SubscribedURLsSynthesizer
+ serviceRevisionExportedURLsCache map[string]map[string][]common.URL
+}
+
+func newServiceDiscoveryRegistry(url *common.URL) (registry.Registry, error) {
+
+ tryInitMetadataService()
+
+ serviceDiscovery, err := creatServiceDiscovery(url)
+ if err != nil {
+ return nil, err
+ }
+ subscribedServices := parseServices(url.GetParam(constant.SUBSCRIBED_SERVICE_NAMES_KEY, ""))
+ subscribedURLsSynthesizers := synthesizer.GetAllSynthesizer()
+ serviceNameMapping := extension.GetGlobalServiceNameMapping()
+ metaDataService, err := extension.GetMetadataService(config.GetApplicationConfig().MetadataType)
+ if err != nil {
+ return nil, perrors.WithMessage(err, "could not init metadata service")
+ }
+ return &serviceDiscoveryRegistry{
+ url: url,
+ serviceDiscovery: serviceDiscovery,
+ subscribedServices: subscribedServices,
+ subscribedURLsSynthesizers: subscribedURLsSynthesizers,
+ registeredListeners: gxset.NewSet(),
+ serviceRevisionExportedURLsCache: make(map[string]map[string][]common.URL),
+ serviceNameMapping: serviceNameMapping,
+ metaDataService: metaDataService,
+ }, nil
+}
+
+func (s *serviceDiscoveryRegistry) UnRegister(url common.URL) error {
+ if !shouldRegister(url) {
+ return nil
+ }
+ return s.metaDataService.UnexportURL(url)
+}
+
+func (s *serviceDiscoveryRegistry) UnSubscribe(url *common.URL, listener registry.NotifyListener) error {
+ if !shouldSubscribe(*url) {
+ return nil
+ }
+ return s.metaDataService.UnsubscribeURL(*url)
+}
+
+func creatServiceDiscovery(url *common.URL) (registry.ServiceDiscovery, error) {
+ sdcName := url.GetParam(constant.SERVICE_DISCOVERY_KEY, "")
+ sdc, ok := config.GetBaseConfig().GetServiceDiscoveries(sdcName)
+ if !ok {
+ return nil, perrors.Errorf("The service discovery with name: %s is not found", sdcName)
+ }
+ originServiceDiscovery, err := extension.GetServiceDiscovery(sdc.Protocol, sdcName)
+ if err != nil {
+ return nil, perrors.WithMessage(err, "Create service discovery fialed")
+ }
+ return event.NewEventPublishingServiceDiscovery(originServiceDiscovery), nil
+}
+
+func parseServices(literalServices string) *gxset.HashSet {
+ set := gxset.NewSet()
+ if len(literalServices) == 0 {
+ return set
+ }
+ var splitServices = strings.Split(literalServices, ",")
+ for _, s := range splitServices {
+ if len(s) != 0 {
+ set.Add(s)
+ }
+ }
+ return set
+}
+
+func (s *serviceDiscoveryRegistry) GetServiceDiscovery() registry.ServiceDiscovery {
+ return s.serviceDiscovery
+}
+
+func (s *serviceDiscoveryRegistry) GetUrl() common.URL {
+ return *s.url
+}
+
+func (s *serviceDiscoveryRegistry) IsAvailable() bool {
+ // TODO(whether available depends on metadata service and service discovery)
+ return true
+}
+
+func (s *serviceDiscoveryRegistry) Destroy() {
+ err := s.serviceDiscovery.Destroy()
+ if err != nil {
+ logger.Errorf("destroy serviceDiscovery catch error:%s", err.Error())
+ }
+}
+
+func (s *serviceDiscoveryRegistry) Register(url common.URL) error {
+ if !shouldRegister(url) {
+ return nil
+ }
+ ok, err := s.metaDataService.ExportURL(url)
+
+ if err != nil {
+ logger.Errorf("The URL[%s] registry catch error:%s!", url.String(), err.Error())
+ return err
+ }
+ if !ok {
+ logger.Warnf("The URL[%s] has been registry!", url.String())
+ }
+
+ // we try to register this instance. Dubbo do this in org.apache.dubbo.config.bootstrap.DubboBootstrap
+ // But we don't want to design a similar bootstrap class.
+ ins, err := createInstance(url)
+ if err != nil {
+ return perrors.WithMessage(err, "could not create servcie instance, please check your service url")
+ }
+
+ err = s.serviceDiscovery.Register(ins)
+ if err != nil {
+ return perrors.WithMessage(err, "register the service failed")
+ }
+
+ err = s.metaDataService.PublishServiceDefinition(url)
+ if err != nil {
+ return perrors.WithMessage(err, "publish the service definition failed. ")
+ }
+ return s.serviceNameMapping.Map(url.GetParam(constant.INTERFACE_KEY, ""),
+ url.GetParam(constant.GROUP_KEY, ""),
+ url.GetParam(constant.Version, ""),
+ url.Protocol)
+}
+
+func createInstance(url common.URL) (registry.ServiceInstance, error) {
+ appConfig := config.GetApplicationConfig()
+ port, err := strconv.ParseInt(url.Port, 10, 32)
+ if err != nil {
+ return nil, perrors.WithMessage(err, "invalid port: "+url.Port)
+ }
+
+ host := url.Ip
+ if len(host) == 0 {
+ host, err = gxnet.GetLocalIP()
+ if err != nil {
+ return nil, perrors.WithMessage(err, "could not get the local Ip")
+ }
+ }
+
+ // usually we will add more metadata
+ metadata := make(map[string]string, 8)
+ metadata[constant.METADATA_STORAGE_TYPE_PROPERTY_NAME] = appConfig.MetadataType
+
+ return ®istry.DefaultServiceInstance{
+ ServiceName: appConfig.Name,
+ Host: host,
+ Port: int(port),
+ Id: host + constant.KEY_SEPARATOR + url.Port,
+ Enable: true,
+ Healthy: true,
+ Metadata: metadata,
+ }, nil
+}
+
+func shouldRegister(url common.URL) bool {
+ side := url.GetParam(constant.SIDE_KEY, "")
+ if side == constant.PROVIDER_PROTOCOL {
+ return true
+ }
+ logger.Debugf("The URL should not be register.", url.String())
+ return false
+}
+
+func (s *serviceDiscoveryRegistry) Subscribe(url *common.URL, notify registry.NotifyListener) error {
+ if !shouldSubscribe(*url) {
+ return nil
+ }
+ _, err := s.metaDataService.SubscribeURL(*url)
+ if err != nil {
+ return perrors.WithMessage(err, "subscribe url error: "+url.String())
+ }
+ services := s.getServices(*url)
+ if services.Empty() {
+ return perrors.Errorf("Should has at least one way to know which services this interface belongs to, "+
+ "subscription url:%s", url.String())
+ }
+ for _, srv := range services.Values() {
+ serviceName := srv.(string)
+ serviceInstances := s.serviceDiscovery.GetInstances(serviceName)
+ s.subscribe(url, notify, serviceName, serviceInstances)
+ listener := ®istry.ServiceInstancesChangedListener{
+ ServiceName: serviceName,
+ ChangedNotify: &InstanceChangeNotify{
+ notify: notify,
+ serviceDiscoveryRegistry: s,
+ },
+ }
+ s.registerServiceInstancesChangedListener(*url, listener)
+ }
+
+ return nil
+}
+func (s *serviceDiscoveryRegistry) registerServiceInstancesChangedListener(url common.URL, listener *registry.ServiceInstancesChangedListener) {
+ listenerId := listener.ServiceName + ":" + getUrlKey(url)
+ if !s.subscribedServices.Contains(listenerId) {
+ err := s.serviceDiscovery.AddListener(listener)
+ if err != nil {
+ logger.Errorf("add listener[%s] catch error,url:%s err:%s", listenerId, url.String(), err.Error())
+ }
+ }
+
+}
+
+func getUrlKey(url common.URL) string {
+ var bf bytes.Buffer
+ if len(url.Protocol) != 0 {
+ bf.WriteString(url.Protocol)
+ bf.WriteString("://")
+ }
+ if len(url.Location) != 0 {
+ bf.WriteString(url.Location)
+ bf.WriteString(":")
+ bf.WriteString(url.Port)
+ }
+ if len(url.Path) != 0 {
+ bf.WriteString("/")
+ bf.WriteString(url.Path)
+ }
+ bf.WriteString("?")
+ appendParam(bf, constant.VERSION_KEY, url)
+ appendParam(bf, constant.GROUP_KEY, url)
+ appendParam(bf, constant.NACOS_PROTOCOL_KEY, url)
+ return bf.String()
+}
+
+func appendParam(buffer bytes.Buffer, paramKey string, url common.URL) {
+ buffer.WriteString(paramKey)
+ buffer.WriteString("=")
+ buffer.WriteString(url.GetParam(paramKey, ""))
+}
+
+func (s *serviceDiscoveryRegistry) subscribe(url *common.URL, notify registry.NotifyListener,
+ serviceName string, serviceInstances []registry.ServiceInstance) {
+ if len(serviceInstances) == 0 {
+ logger.Warnf("here is no instance in service[name : %s]", serviceName)
+ return
+ }
+ var subscribedURLs []common.URL
+ subscribedURLs = append(subscribedURLs, s.getExportedUrls(*url, serviceInstances)...)
+ if len(subscribedURLs) == 0 {
+ subscribedURLs = s.synthesizeSubscribedURLs(url, serviceInstances)
+ }
+ // TODO make sure it's workable
+ for _, url := range subscribedURLs {
+ notify.Notify(®istry.ServiceEvent{
+ Action: remoting.EventTypeAdd,
+ Service: url,
+ })
+ }
+
+}
+
+func (s *serviceDiscoveryRegistry) synthesizeSubscribedURLs(subscribedURL *common.URL, serviceInstances []registry.ServiceInstance) []common.URL {
+ var urls []common.URL
+ for _, syn := range s.subscribedURLsSynthesizers {
+ if syn.Support(subscribedURL) {
+ urls = append(urls, syn.Synthesize(subscribedURL, serviceInstances)...)
+ }
+ }
+ return urls
+}
Review comment:
pls add a blank line
##########
File path: registry/servicediscovery/service_discovery_registry.go
##########
@@ -0,0 +1,713 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package servicediscovery
+
+import (
+ "bytes"
+ "encoding/json"
+ "strconv"
+ "strings"
+ "sync"
+)
+
+import (
+ cm "github.com/Workiva/go-datastructures/common"
+ gxset "github.com/dubbogo/gost/container/set"
+ gxnet "github.com/dubbogo/gost/net"
+ perrors "github.com/pkg/errors"
+ "go.uber.org/atomic"
+)
+
+import (
+ "github.com/apache/dubbo-go/common"
+ "github.com/apache/dubbo-go/common/constant"
+ "github.com/apache/dubbo-go/common/extension"
+ "github.com/apache/dubbo-go/common/logger"
+ "github.com/apache/dubbo-go/common/observer"
+ "github.com/apache/dubbo-go/config"
+ "github.com/apache/dubbo-go/metadata/mapping"
+ "github.com/apache/dubbo-go/metadata/service"
+ "github.com/apache/dubbo-go/metadata/service/exporter/configurable"
+ "github.com/apache/dubbo-go/registry"
+ "github.com/apache/dubbo-go/registry/event"
+ "github.com/apache/dubbo-go/registry/servicediscovery/synthesizer"
+ "github.com/apache/dubbo-go/remoting"
+)
+
+const (
+ protocolName = "service-discovery"
+)
+
+func init() {
+ extension.SetRegistry(protocolName, newServiceDiscoveryRegistry)
+}
+
+// serviceDiscoveryRegistry is the implementation of application-level registry.
+// It's completely different from other registry implementations
+// This implementation is based on ServiceDiscovery abstraction and ServiceNameMapping
+// In order to keep compatible with interface-level registry,
+// this implementation is
+type serviceDiscoveryRegistry struct {
+ lock sync.RWMutex
+ url *common.URL
+ serviceDiscovery registry.ServiceDiscovery
+ subscribedServices *gxset.HashSet
+ serviceNameMapping mapping.ServiceNameMapping
+ metaDataService service.MetadataService
+ registeredListeners *gxset.HashSet
+ subscribedURLsSynthesizers []synthesizer.SubscribedURLsSynthesizer
+ serviceRevisionExportedURLsCache map[string]map[string][]common.URL
+}
+
+func newServiceDiscoveryRegistry(url *common.URL) (registry.Registry, error) {
+
+ tryInitMetadataService()
+
+ serviceDiscovery, err := creatServiceDiscovery(url)
+ if err != nil {
+ return nil, err
+ }
+ subscribedServices := parseServices(url.GetParam(constant.SUBSCRIBED_SERVICE_NAMES_KEY, ""))
+ subscribedURLsSynthesizers := synthesizer.GetAllSynthesizer()
+ serviceNameMapping := extension.GetGlobalServiceNameMapping()
+ metaDataService, err := extension.GetMetadataService(config.GetApplicationConfig().MetadataType)
+ if err != nil {
+ return nil, perrors.WithMessage(err, "could not init metadata service")
+ }
+ return &serviceDiscoveryRegistry{
+ url: url,
+ serviceDiscovery: serviceDiscovery,
+ subscribedServices: subscribedServices,
+ subscribedURLsSynthesizers: subscribedURLsSynthesizers,
+ registeredListeners: gxset.NewSet(),
+ serviceRevisionExportedURLsCache: make(map[string]map[string][]common.URL),
+ serviceNameMapping: serviceNameMapping,
+ metaDataService: metaDataService,
+ }, nil
+}
+
+func (s *serviceDiscoveryRegistry) UnRegister(url common.URL) error {
+ if !shouldRegister(url) {
+ return nil
+ }
+ return s.metaDataService.UnexportURL(url)
+}
+
+func (s *serviceDiscoveryRegistry) UnSubscribe(url *common.URL, listener registry.NotifyListener) error {
+ if !shouldSubscribe(*url) {
+ return nil
+ }
+ return s.metaDataService.UnsubscribeURL(*url)
+}
+
+func creatServiceDiscovery(url *common.URL) (registry.ServiceDiscovery, error) {
+ sdcName := url.GetParam(constant.SERVICE_DISCOVERY_KEY, "")
+ sdc, ok := config.GetBaseConfig().GetServiceDiscoveries(sdcName)
+ if !ok {
+ return nil, perrors.Errorf("The service discovery with name: %s is not found", sdcName)
+ }
+ originServiceDiscovery, err := extension.GetServiceDiscovery(sdc.Protocol, sdcName)
+ if err != nil {
+ return nil, perrors.WithMessage(err, "Create service discovery fialed")
+ }
+ return event.NewEventPublishingServiceDiscovery(originServiceDiscovery), nil
+}
+
+func parseServices(literalServices string) *gxset.HashSet {
+ set := gxset.NewSet()
+ if len(literalServices) == 0 {
+ return set
+ }
+ var splitServices = strings.Split(literalServices, ",")
+ for _, s := range splitServices {
+ if len(s) != 0 {
+ set.Add(s)
+ }
+ }
+ return set
+}
+
+func (s *serviceDiscoveryRegistry) GetServiceDiscovery() registry.ServiceDiscovery {
+ return s.serviceDiscovery
+}
+
+func (s *serviceDiscoveryRegistry) GetUrl() common.URL {
+ return *s.url
+}
+
+func (s *serviceDiscoveryRegistry) IsAvailable() bool {
+ // TODO(whether available depends on metadata service and service discovery)
+ return true
+}
+
+func (s *serviceDiscoveryRegistry) Destroy() {
+ err := s.serviceDiscovery.Destroy()
+ if err != nil {
+ logger.Errorf("destroy serviceDiscovery catch error:%s", err.Error())
+ }
+}
+
+func (s *serviceDiscoveryRegistry) Register(url common.URL) error {
+ if !shouldRegister(url) {
+ return nil
+ }
+ ok, err := s.metaDataService.ExportURL(url)
+
+ if err != nil {
+ logger.Errorf("The URL[%s] registry catch error:%s!", url.String(), err.Error())
+ return err
+ }
+ if !ok {
+ logger.Warnf("The URL[%s] has been registry!", url.String())
+ }
+
+ // we try to register this instance. Dubbo do this in org.apache.dubbo.config.bootstrap.DubboBootstrap
+ // But we don't want to design a similar bootstrap class.
+ ins, err := createInstance(url)
+ if err != nil {
+ return perrors.WithMessage(err, "could not create servcie instance, please check your service url")
+ }
+
+ err = s.serviceDiscovery.Register(ins)
+ if err != nil {
+ return perrors.WithMessage(err, "register the service failed")
+ }
+
+ err = s.metaDataService.PublishServiceDefinition(url)
+ if err != nil {
+ return perrors.WithMessage(err, "publish the service definition failed. ")
+ }
+ return s.serviceNameMapping.Map(url.GetParam(constant.INTERFACE_KEY, ""),
+ url.GetParam(constant.GROUP_KEY, ""),
+ url.GetParam(constant.Version, ""),
+ url.Protocol)
+}
+
+func createInstance(url common.URL) (registry.ServiceInstance, error) {
+ appConfig := config.GetApplicationConfig()
+ port, err := strconv.ParseInt(url.Port, 10, 32)
+ if err != nil {
+ return nil, perrors.WithMessage(err, "invalid port: "+url.Port)
+ }
+
+ host := url.Ip
+ if len(host) == 0 {
+ host, err = gxnet.GetLocalIP()
+ if err != nil {
+ return nil, perrors.WithMessage(err, "could not get the local Ip")
+ }
+ }
+
+ // usually we will add more metadata
+ metadata := make(map[string]string, 8)
+ metadata[constant.METADATA_STORAGE_TYPE_PROPERTY_NAME] = appConfig.MetadataType
+
+ return ®istry.DefaultServiceInstance{
+ ServiceName: appConfig.Name,
+ Host: host,
+ Port: int(port),
+ Id: host + constant.KEY_SEPARATOR + url.Port,
+ Enable: true,
+ Healthy: true,
+ Metadata: metadata,
+ }, nil
+}
+
+func shouldRegister(url common.URL) bool {
+ side := url.GetParam(constant.SIDE_KEY, "")
+ if side == constant.PROVIDER_PROTOCOL {
+ return true
+ }
+ logger.Debugf("The URL should not be register.", url.String())
+ return false
+}
+
+func (s *serviceDiscoveryRegistry) Subscribe(url *common.URL, notify registry.NotifyListener) error {
+ if !shouldSubscribe(*url) {
+ return nil
+ }
+ _, err := s.metaDataService.SubscribeURL(*url)
+ if err != nil {
+ return perrors.WithMessage(err, "subscribe url error: "+url.String())
+ }
+ services := s.getServices(*url)
+ if services.Empty() {
+ return perrors.Errorf("Should has at least one way to know which services this interface belongs to, "+
+ "subscription url:%s", url.String())
+ }
+ for _, srv := range services.Values() {
+ serviceName := srv.(string)
+ serviceInstances := s.serviceDiscovery.GetInstances(serviceName)
+ s.subscribe(url, notify, serviceName, serviceInstances)
+ listener := ®istry.ServiceInstancesChangedListener{
+ ServiceName: serviceName,
+ ChangedNotify: &InstanceChangeNotify{
+ notify: notify,
+ serviceDiscoveryRegistry: s,
+ },
+ }
+ s.registerServiceInstancesChangedListener(*url, listener)
+ }
+
+ return nil
+}
Review comment:
pls add a blank line.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@dubbo.apache.org
For additional commands, e-mail: notifications-help@dubbo.apache.org
[GitHub] [dubbo-go] pantianying commented on a change in pull request #604: Ftr: Application-level Registry Model
Posted by GitBox <gi...@apache.org>.
pantianying commented on a change in pull request #604:
URL: https://github.com/apache/dubbo-go/pull/604#discussion_r446862957
##########
File path: registry/zookeeper/service_discovery.go
##########
@@ -0,0 +1,350 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package zookeeper
+
+import (
+ "fmt"
+ "net/url"
+ "strconv"
+ "strings"
+ "sync"
+)
+
+import (
+ "github.com/dubbogo/gost/container/set"
+ "github.com/dubbogo/gost/page"
+ perrors "github.com/pkg/errors"
+)
+
+import (
+ "github.com/apache/dubbo-go/common"
+ "github.com/apache/dubbo-go/common/constant"
+ "github.com/apache/dubbo-go/common/extension"
+ "github.com/apache/dubbo-go/common/logger"
+ "github.com/apache/dubbo-go/config"
+ "github.com/apache/dubbo-go/registry"
+ "github.com/apache/dubbo-go/remoting"
+ "github.com/apache/dubbo-go/remoting/zookeeper"
+ "github.com/apache/dubbo-go/remoting/zookeeper/curator_discovery"
+)
+
+const (
+ // RegistryZkClient zk client name
+ ServiceDiscoveryZkClient = "zk service discovery"
+)
+
+var (
+ // 16 would be enough. We won't use concurrentMap because in most cases, there are not race condition
+ instanceMap = make(map[string]registry.ServiceDiscovery, 16)
+ initLock sync.Mutex
+)
+
+// init will put the service discovery into extension
+func init() {
+ extension.SetServiceDiscovery(constant.ZOOKEEPER_KEY, newZookeeperServiceDiscovery)
+}
+
+type zookeeperServiceDiscovery struct {
+ client *zookeeper.ZookeeperClient
+ csd *curator_discovery.ServiceDiscovery
+ listener *zookeeper.ZkEventListener
+ url *common.URL
+ wg sync.WaitGroup
+ cltLock sync.Mutex
+ listenLock sync.Mutex
+ done chan struct{}
+ rootPath string
+ listenNames []string
+}
+
+// newZookeeperServiceDiscovery the constructor of newZookeeperServiceDiscovery
+func newZookeeperServiceDiscovery(name string) (registry.ServiceDiscovery, error) {
+ instance, ok := instanceMap[name]
+ if ok {
+ return instance, nil
+ }
+
+ initLock.Lock()
+ defer initLock.Unlock()
+
+ // double check
+ instance, ok = instanceMap[name]
+ if ok {
+ return instance, nil
+ }
+
+ sdc, ok := config.GetBaseConfig().GetServiceDiscoveries(name)
+ if !ok || len(sdc.RemoteRef) == 0 {
+ return nil, perrors.New("could not init the instance because the config is invalid")
+ }
+ remoteConfig, ok := config.GetBaseConfig().GetRemoteConfig(sdc.RemoteRef)
+ if !ok {
+ return nil, perrors.New("could not find the remote config for name: " + sdc.RemoteRef)
+ }
+ rootPath := remoteConfig.GetParam("rootPath", "/services")
+ url := common.NewURLWithOptions(
+ common.WithParams(make(url.Values)),
+ common.WithPassword(remoteConfig.Password),
+ common.WithUsername(remoteConfig.Username),
+ common.WithParamsValue(constant.REGISTRY_TIMEOUT_KEY, remoteConfig.TimeoutStr))
+ url.Location = remoteConfig.Address
+ zksd := &zookeeperServiceDiscovery{
+ url: url,
+ rootPath: rootPath,
+ }
+ err := zookeeper.ValidateZookeeperClient(zksd, zookeeper.WithZkName(ServiceDiscoveryZkClient))
+ if err != nil {
+ return nil, err
+ }
+ go zookeeper.HandleClientRestart(zksd)
+ zksd.csd = curator_discovery.NewServiceDiscovery(zksd.client, rootPath)
+ return zksd, nil
+}
+
+// nolint
+func (zksd *zookeeperServiceDiscovery) ZkClient() *zookeeper.ZookeeperClient {
+ return zksd.client
+}
+
+// nolint
+func (zksd *zookeeperServiceDiscovery) SetZkClient(client *zookeeper.ZookeeperClient) {
+ zksd.client = client
+}
+
+// nolint
+func (zksd *zookeeperServiceDiscovery) ZkClientLock() *sync.Mutex {
+ return &zksd.cltLock
+}
+
+// nolint
+func (zksd *zookeeperServiceDiscovery) WaitGroup() *sync.WaitGroup {
+ return &zksd.wg
+}
+
+// nolint
+func (zksd *zookeeperServiceDiscovery) Done() chan struct{} {
+ return zksd.done
+}
+
+// RestartCallBack when zookeeper connection reconnect this function will be invoked.
+// try to re-register service, and listen services
+func (zksd *zookeeperServiceDiscovery) RestartCallBack() bool {
+ zksd.csd.ReRegisterServices()
+ zksd.listenLock.Lock()
+ defer zksd.listenLock.Unlock()
+ for _, name := range zksd.listenNames {
+ zksd.csd.ListenServiceEvent(name, zksd)
+ }
+ return true
+}
+
+// nolint
+func (zksd *zookeeperServiceDiscovery) GetUrl() common.URL {
+ return *zksd.url
+}
+
+// nolint
+func (zksd *zookeeperServiceDiscovery) String() string {
+ return fmt.Sprintf("zookeeper-service-discovery[%s]", zksd.url)
+}
+
+// Close client be closed
+func (zksd *zookeeperServiceDiscovery) Destroy() error {
+ zksd.client.Close()
+ return nil
+}
+
+// Register will register service in zookeeper, instance convert to curator's service instance
+// which define in curator-x-discovery.
+func (zksd *zookeeperServiceDiscovery) Register(instance registry.ServiceInstance) error {
+ cris := zksd.toCuratorInstance(instance)
+ return zksd.csd.RegisterService(cris)
+}
+
+// Register will update service in zookeeper, instance convert to curator's service instance
+// which define in curator-x-discovery, please refer to https://github.com/apache/curator.
+func (zksd *zookeeperServiceDiscovery) Update(instance registry.ServiceInstance) error {
+ cris := zksd.toCuratorInstance(instance)
+ return zksd.csd.UpdateService(cris)
+}
+
+// Unregister will unregister the instance in zookeeper
+func (zksd *zookeeperServiceDiscovery) Unregister(instance registry.ServiceInstance) error {
+ cris := zksd.toCuratorInstance(instance)
+ return zksd.csd.UnregisterService(cris)
+}
+
+// GetDefaultPageSize will return the constant registry.DefaultPageSize
+func (zksd *zookeeperServiceDiscovery) GetDefaultPageSize() int {
+ return registry.DefaultPageSize
+}
+
+// GetServices will return the all services in zookeeper
+func (zksd *zookeeperServiceDiscovery) GetServices() *gxset.HashSet {
+ services, err := zksd.csd.QueryForNames()
+ res := gxset.NewSet()
+ if err != nil {
+ logger.Errorf("[zkServiceDiscovery] Could not query the services: %v", err)
+ return res
+ }
+ for _, service := range services {
+ res.Add(service)
+ }
+ return res
+}
+
+// GetInstances will return the instances in a service
+func (zksd *zookeeperServiceDiscovery) GetInstances(serviceName string) []registry.ServiceInstance {
+ criss, err := zksd.csd.QueryForInstances(serviceName)
+ if err != nil {
+ logger.Errorf("[zkServiceDiscovery] Could not query the instances for service{%s}, error = err{%v} ",
+ serviceName, err)
+ return make([]registry.ServiceInstance, 0, 0)
+ }
+ iss := make([]registry.ServiceInstance, 0, len(criss))
+ for _, cris := range criss {
+ iss = append(iss, zksd.toZookeeperInstance(cris))
+ }
+ return iss
+}
+
+// GetInstancesByPage will return the instances
+func (zksd *zookeeperServiceDiscovery) GetInstancesByPage(serviceName string, offset int, pageSize int) gxpage.Pager {
+ all := zksd.GetInstances(serviceName)
+ res := make([]interface{}, 0, pageSize)
+ // could not use res = all[a:b] here because the res should be []interface{}, not []ServiceInstance
+ for i := offset; i < len(all) && i < offset+pageSize; i++ {
+ res = append(res, all[i])
+ }
+ return gxpage.New(offset, pageSize, res, len(all))
+}
+
+// GetHealthyInstancesByPage will return the instance
+// In zookeeper, all service instance's is healthy.
+// However, the healthy parameter in this method maybe false. So we can not use that API.
+// Thus, we must query all instances and then do filter
+func (zksd *zookeeperServiceDiscovery) GetHealthyInstancesByPage(serviceName string, offset int, pageSize int, healthy bool) gxpage.Pager {
+ all := zksd.GetInstances(serviceName)
+ res := make([]interface{}, 0, pageSize)
+ // could not use res = all[a:b] here because the res should be []interface{}, not []ServiceInstance
+ var (
+ i = offset
+ count = 0
+ )
+ for i < len(all) && count < pageSize {
+ ins := all[i]
+ if ins.IsHealthy() == healthy {
+ res = append(res, all[i])
+ count++
+ }
+ i++
+ }
+ return gxpage.New(offset, pageSize, res, len(all))
+}
+
+// GetRequestInstances will return the instances
+func (zksd *zookeeperServiceDiscovery) GetRequestInstances(serviceNames []string, offset int, requestedSize int) map[string]gxpage.Pager {
+ res := make(map[string]gxpage.Pager, len(serviceNames))
+ for _, name := range serviceNames {
+ res[name] = zksd.GetInstancesByPage(name, offset, requestedSize)
+ }
+ return res
+}
+
+// AddListener ListenServiceEvent will add a data listener in service
+func (zksd *zookeeperServiceDiscovery) AddListener(listener *registry.ServiceInstancesChangedListener) error {
+ zksd.listenLock.Lock()
+ defer zksd.listenLock.Unlock()
+ zksd.listenNames = append(zksd.listenNames, listener.ServiceName)
+ zksd.csd.ListenServiceEvent(listener.ServiceName, zksd)
+ return nil
+}
+
+func (zksd *zookeeperServiceDiscovery) DispatchEventByServiceName(serviceName string) error {
+ return zksd.DispatchEventForInstances(serviceName, zksd.GetInstances(serviceName))
+}
+
+// DispatchEventForInstances dispatch ServiceInstancesChangedEvent
+func (zksd *zookeeperServiceDiscovery) DispatchEventForInstances(serviceName string, instances []registry.ServiceInstance) error {
+ return zksd.DispatchEvent(registry.NewServiceInstancesChangedEvent(serviceName, instances))
+}
+
+// nolint
+func (zksd *zookeeperServiceDiscovery) DispatchEvent(event *registry.ServiceInstancesChangedEvent) error {
+ extension.GetGlobalDispatcher().Dispatch(event)
+ return nil
+}
+
+// DataChange implement DataListener's DataChange function
+// to resolve event to do DispatchEventByServiceName
+func (zksd *zookeeperServiceDiscovery) DataChange(eventType remoting.Event) bool {
+ path := strings.TrimPrefix(eventType.Path, zksd.rootPath)
+ path = strings.TrimPrefix(path, constant.PATH_SEPARATOR)
+ // get service name in zk path
+ serviceName := strings.Split(path, constant.PATH_SEPARATOR)[0]
+ err := zksd.DispatchEventByServiceName(serviceName)
+ if err != nil {
+ logger.Errorf("[zkServiceDiscovery] DispatchEventByServiceName{%s} error = err{%v}", serviceName, err)
Review comment:
here should return false
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@dubbo.apache.org
For additional commands, e-mail: notifications-help@dubbo.apache.org
[GitHub] [dubbo-go] AlexStocks commented on a change in pull request #604: Ftr: Application-level Registry Model
Posted by GitBox <gi...@apache.org>.
AlexStocks commented on a change in pull request #604:
URL: https://github.com/apache/dubbo-go/pull/604#discussion_r449813818
##########
File path: common/extension/metadata_service_proxy_factory.go
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package extension
+
+import (
+ "fmt"
+)
+
+import (
+ "github.com/apache/dubbo-go/metadata/service"
+)
+
+var (
+ metadataServiceProxyFactoryMap = make(map[string]func() service.MetadataServiceProxyFactory)
Review comment:
hi, @flycash, there are two problems here. firstly, we should init the map as follows,
```Go
metadataServiceProxyFactoryMap = make(map[string]func() service.MetadataServiceProxyFactory, 4)
```
secondly, we should define a func type as follows
```Go
type MetadataServiceProxyFactoryFunc func() service.MetadataServiceProxyFactory
```
And then, u can use this func in this package to simplify codes.
##########
File path: common/observer/dispatcher/direct_event_dispatcher_test.go
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package dispatcher
+
+import (
+ "fmt"
+ "reflect"
+ "testing"
+)
+
+import (
+ "github.com/apache/dubbo-go/common/observer"
+)
+
+func TestDirectEventDispatcher_Dispatch(t *testing.T) {
+ ded := NewDirectEventDispatcher()
+ ded.AddEventListener(&TestEventListener{
+ BaseListener: observer.NewBaseListener(),
+ })
+ ded.AddEventListener(&TestEventListener1{})
+ ded.Dispatch(&TestEvent{})
+ ded.Dispatch(nil)
+}
+
+type TestEvent struct {
+ observer.BaseEvent
+}
+
+type TestEventListener struct {
+ observer.BaseListener
+ observer.EventListener
+}
+
+func (tel *TestEventListener) OnEvent(e observer.Event) error {
+ fmt.Println("TestEventListener")
Review comment:
just for test. not need to delete it.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@dubbo.apache.org
For additional commands, e-mail: notifications-help@dubbo.apache.org
[GitHub] [dubbo-go] hxmhlt commented on a change in pull request #604: Application-level Registry Model
Posted by GitBox <gi...@apache.org>.
hxmhlt commented on a change in pull request #604:
URL: https://github.com/apache/dubbo-go/pull/604#discussion_r439799174
##########
File path: common/extension/metadata_service.go
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package extension
+
+import (
+ "fmt"
+
+ "github.com/apache/dubbo-go/metadata/service"
+)
Review comment:
split
##########
File path: common/extension/event_dispatcher.go
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package extension
+
+import (
+ "sync"
Review comment:
split
##########
File path: metadata/mapping/memory/service_name_mapping.go
##########
@@ -18,13 +18,22 @@
package memory
import (
+ "sync"
+
gxset "github.com/dubbogo/gost/container/set"
+
+ "github.com/apache/dubbo-go/metadata/mapping"
)
Review comment:
split
##########
File path: common/extension/service_instance_customizer.go
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package extension
+
+import (
+ "sort"
+
+ "github.com/apache/dubbo-go/registry"
Review comment:
split
##########
File path: config/base_config.go
##########
@@ -64,7 +83,7 @@ func (c *BaseConfig) startConfigCenter() error {
if c.prepareEnvironment() != nil {
return perrors.WithMessagef(err, "start config center error!")
}
- //c.fresh()
+ // c.fresh()
return err
Review comment:
why delete fresh?
##########
File path: common/extension/metadata_service_proxy_factory.go
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package extension
+
+import (
+ "fmt"
+
+ "github.com/apache/dubbo-go/metadata/service"
Review comment:
split
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@dubbo.apache.org
For additional commands, e-mail: notifications-help@dubbo.apache.org
[GitHub] [dubbo-go] fangyincheng commented on a change in pull request #604: Ftr: Application-level Registry Model
Posted by GitBox <gi...@apache.org>.
fangyincheng commented on a change in pull request #604:
URL: https://github.com/apache/dubbo-go/pull/604#discussion_r443999094
##########
File path: common/observer/dispatcher/direct_event_dispatcher_test.go
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package dispatcher
+
+import (
+ "fmt"
+ "reflect"
+ "testing"
+)
+
+import (
+ "github.com/apache/dubbo-go/common/observer"
+)
+
+func TestDirectEventDispatcher_Dispatch(t *testing.T) {
+ ded := NewDirectEventDispatcher()
+ ded.AddEventListener(&TestEventListener{
+ BaseListener: observer.NewBaseListener(),
+ })
+ ded.AddEventListener(&TestEventListener1{})
+ ded.Dispatch(&TestEvent{})
+ ded.Dispatch(nil)
+}
+
+type TestEvent struct {
+ observer.BaseEvent
+}
+
+type TestEventListener struct {
+ observer.BaseListener
+ observer.EventListener
+}
+
+func (tel *TestEventListener) OnEvent(e observer.Event) error {
+ fmt.Println("TestEventListener")
Review comment:
delete "fmt.Println"?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@dubbo.apache.org
For additional commands, e-mail: notifications-help@dubbo.apache.org
[GitHub] [dubbo-go] zouyx commented on a change in pull request #604: Ftr: Application-level Registry Model
Posted by GitBox <gi...@apache.org>.
zouyx commented on a change in pull request #604:
URL: https://github.com/apache/dubbo-go/pull/604#discussion_r453206146
##########
File path: common/observer/listenable.go
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package observer
+
+import (
+ "reflect"
+ "sort"
+ "sync"
+)
+
+// Listenable could add and remove the event listener
+type Listenable interface {
+ AddEventListener(listener EventListener)
+ AddEventListeners(listenersSlice []EventListener)
+ RemoveEventListener(listener EventListener)
+ RemoveEventListeners(listenersSlice []EventListener)
+ GetAllEventListeners() []EventListener
+ RemoveAllEventListeners()
+}
+
+// BaseListener base listenable
+type BaseListener struct {
+ Listenable
+ ListenersCache map[reflect.Type][]EventListener
+ Mutex sync.RWMutex
Review comment:
why this lock is public ?
##########
File path: metadata/mapping/dynamic/service_name_mapping.go
##########
@@ -28,18 +29,26 @@ import (
)
import (
+ commonCfg "github.com/apache/dubbo-go/common/config"
"github.com/apache/dubbo-go/common/constant"
+ "github.com/apache/dubbo-go/common/extension"
+ "github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/config"
"github.com/apache/dubbo-go/config_center"
- "github.com/apache/dubbo-go/metadata"
+ "github.com/apache/dubbo-go/metadata/mapping"
)
const (
- defaultGroup = config_center.DEFAULT_GROUP
+ defaultGroup = "mapping"
Review comment:
Once you change this variable , will it cause compatibility issues?
##########
File path: registry/servicediscovery/synthesizer/subscribed_urls_synthesizer_factory.go
##########
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package synthesizer
+
+var (
+ synthesizers []SubscribedURLsSynthesizer
+)
+
+// nolint
+func AddSynthesizer(synthesizer SubscribedURLsSynthesizer) {
+ synthesizers = append(synthesizers, synthesizer)
Review comment:
should you add lock here?
##########
File path: remoting/etcdv3/client.go
##########
@@ -130,6 +131,26 @@ func ValidateClient(container clientFacade, opts ...Option) error {
return nil
}
+// NewServiceDiscoveryClient
Review comment:
```suggestion
// nolint
```
##########
File path: registry/servicediscovery/service_discovery_registry.go
##########
@@ -0,0 +1,713 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package servicediscovery
+
+import (
+ "bytes"
+ "encoding/json"
+ "strconv"
+ "strings"
+ "sync"
+)
+
+import (
+ cm "github.com/Workiva/go-datastructures/common"
+ gxset "github.com/dubbogo/gost/container/set"
+ gxnet "github.com/dubbogo/gost/net"
+ perrors "github.com/pkg/errors"
+ "go.uber.org/atomic"
+)
+
+import (
+ "github.com/apache/dubbo-go/common"
+ "github.com/apache/dubbo-go/common/constant"
+ "github.com/apache/dubbo-go/common/extension"
+ "github.com/apache/dubbo-go/common/logger"
+ "github.com/apache/dubbo-go/common/observer"
+ "github.com/apache/dubbo-go/config"
+ "github.com/apache/dubbo-go/metadata/mapping"
+ "github.com/apache/dubbo-go/metadata/service"
+ "github.com/apache/dubbo-go/metadata/service/exporter/configurable"
+ "github.com/apache/dubbo-go/registry"
+ "github.com/apache/dubbo-go/registry/event"
+ "github.com/apache/dubbo-go/registry/servicediscovery/synthesizer"
+ "github.com/apache/dubbo-go/remoting"
+)
+
+const (
+ protocolName = "service-discovery"
+)
+
+func init() {
+ extension.SetRegistry(protocolName, newServiceDiscoveryRegistry)
+}
+
+// serviceDiscoveryRegistry is the implementation of application-level registry.
+// It's completely different from other registry implementations
+// This implementation is based on ServiceDiscovery abstraction and ServiceNameMapping
+// In order to keep compatible with interface-level registry,
+// this implementation is
+type serviceDiscoveryRegistry struct {
+ lock sync.RWMutex
+ url *common.URL
+ serviceDiscovery registry.ServiceDiscovery
+ subscribedServices *gxset.HashSet
+ serviceNameMapping mapping.ServiceNameMapping
+ metaDataService service.MetadataService
+ registeredListeners *gxset.HashSet
+ subscribedURLsSynthesizers []synthesizer.SubscribedURLsSynthesizer
+ serviceRevisionExportedURLsCache map[string]map[string][]common.URL
+}
+
+func newServiceDiscoveryRegistry(url *common.URL) (registry.Registry, error) {
+
+ tryInitMetadataService()
+
+ serviceDiscovery, err := creatServiceDiscovery(url)
+ if err != nil {
+ return nil, err
+ }
+ subscribedServices := parseServices(url.GetParam(constant.SUBSCRIBED_SERVICE_NAMES_KEY, ""))
+ subscribedURLsSynthesizers := synthesizer.GetAllSynthesizer()
+ serviceNameMapping := extension.GetGlobalServiceNameMapping()
+ metaDataService, err := extension.GetMetadataService(config.GetApplicationConfig().MetadataType)
+ if err != nil {
+ return nil, perrors.WithMessage(err, "could not init metadata service")
+ }
+ return &serviceDiscoveryRegistry{
+ url: url,
+ serviceDiscovery: serviceDiscovery,
+ subscribedServices: subscribedServices,
+ subscribedURLsSynthesizers: subscribedURLsSynthesizers,
+ registeredListeners: gxset.NewSet(),
+ serviceRevisionExportedURLsCache: make(map[string]map[string][]common.URL, 8),
+ serviceNameMapping: serviceNameMapping,
+ metaDataService: metaDataService,
+ }, nil
+}
+
+func (s *serviceDiscoveryRegistry) UnRegister(url common.URL) error {
+ if !shouldRegister(url) {
+ return nil
+ }
+ return s.metaDataService.UnexportURL(url)
+}
+
+func (s *serviceDiscoveryRegistry) UnSubscribe(url *common.URL, listener registry.NotifyListener) error {
+ if !shouldSubscribe(*url) {
+ return nil
+ }
+ return s.metaDataService.UnsubscribeURL(*url)
+}
+
+func creatServiceDiscovery(url *common.URL) (registry.ServiceDiscovery, error) {
+ sdcName := url.GetParam(constant.SERVICE_DISCOVERY_KEY, "")
+ sdc, ok := config.GetBaseConfig().GetServiceDiscoveries(sdcName)
+ if !ok {
+ return nil, perrors.Errorf("The service discovery with name: %s is not found", sdcName)
+ }
+ originServiceDiscovery, err := extension.GetServiceDiscovery(sdc.Protocol, sdcName)
+ if err != nil {
+ return nil, perrors.WithMessage(err, "Create service discovery fialed")
+ }
+ return event.NewEventPublishingServiceDiscovery(originServiceDiscovery), nil
+}
+
+func parseServices(literalServices string) *gxset.HashSet {
+ set := gxset.NewSet()
+ if len(literalServices) == 0 {
+ return set
+ }
+ var splitServices = strings.Split(literalServices, ",")
+ for _, s := range splitServices {
+ if len(s) != 0 {
+ set.Add(s)
+ }
+ }
+ return set
+}
+
+func (s *serviceDiscoveryRegistry) GetServiceDiscovery() registry.ServiceDiscovery {
+ return s.serviceDiscovery
+}
+
+func (s *serviceDiscoveryRegistry) GetUrl() common.URL {
+ return *s.url
+}
+
+func (s *serviceDiscoveryRegistry) IsAvailable() bool {
+ // TODO(whether available depends on metadata service and service discovery)
+ return true
+}
+
+func (s *serviceDiscoveryRegistry) Destroy() {
+ err := s.serviceDiscovery.Destroy()
+ if err != nil {
+ logger.Errorf("destroy serviceDiscovery catch error:%s", err.Error())
+ }
+}
+
+func (s *serviceDiscoveryRegistry) Register(url common.URL) error {
+ if !shouldRegister(url) {
+ return nil
+ }
+ ok, err := s.metaDataService.ExportURL(url)
+
+ if err != nil {
+ logger.Errorf("The URL[%s] registry catch error:%s!", url.String(), err.Error())
+ return err
+ }
+ if !ok {
+ logger.Warnf("The URL[%s] has been registry!", url.String())
+ }
+
+ // we try to register this instance. Dubbo do this in org.apache.dubbo.config.bootstrap.DubboBootstrap
+ // But we don't want to design a similar bootstrap class.
+ ins, err := createInstance(url)
+ if err != nil {
+ return perrors.WithMessage(err, "could not create servcie instance, please check your service url")
+ }
+
+ err = s.serviceDiscovery.Register(ins)
+ if err != nil {
+ return perrors.WithMessage(err, "register the service failed")
+ }
+
+ err = s.metaDataService.PublishServiceDefinition(url)
+ if err != nil {
+ return perrors.WithMessage(err, "publish the service definition failed. ")
+ }
+ return s.serviceNameMapping.Map(url.GetParam(constant.INTERFACE_KEY, ""),
+ url.GetParam(constant.GROUP_KEY, ""),
+ url.GetParam(constant.Version, ""),
+ url.Protocol)
+}
+
+func createInstance(url common.URL) (registry.ServiceInstance, error) {
+ appConfig := config.GetApplicationConfig()
+ port, err := strconv.ParseInt(url.Port, 10, 32)
+ if err != nil {
+ return nil, perrors.WithMessage(err, "invalid port: "+url.Port)
+ }
+
+ host := url.Ip
+ if len(host) == 0 {
+ host, err = gxnet.GetLocalIP()
+ if err != nil {
+ return nil, perrors.WithMessage(err, "could not get the local Ip")
+ }
+ }
+
+ // usually we will add more metadata
+ metadata := make(map[string]string, 8)
+ metadata[constant.METADATA_STORAGE_TYPE_PROPERTY_NAME] = appConfig.MetadataType
+
+ return ®istry.DefaultServiceInstance{
+ ServiceName: appConfig.Name,
+ Host: host,
+ Port: int(port),
+ Id: host + constant.KEY_SEPARATOR + url.Port,
+ Enable: true,
+ Healthy: true,
+ Metadata: metadata,
+ }, nil
+}
+
+func shouldRegister(url common.URL) bool {
+ side := url.GetParam(constant.SIDE_KEY, "")
+ if side == constant.PROVIDER_PROTOCOL {
+ return true
+ }
+ logger.Debugf("The URL should not be register.", url.String())
+ return false
+}
+
+func (s *serviceDiscoveryRegistry) Subscribe(url *common.URL, notify registry.NotifyListener) error {
+ if !shouldSubscribe(*url) {
+ return nil
+ }
+ _, err := s.metaDataService.SubscribeURL(*url)
+ if err != nil {
+ return perrors.WithMessage(err, "subscribe url error: "+url.String())
+ }
+ services := s.getServices(*url)
+ if services.Empty() {
+ return perrors.Errorf("Should has at least one way to know which services this interface belongs to, "+
+ "subscription url:%s", url.String())
+ }
+ for _, srv := range services.Values() {
+ serviceName := srv.(string)
+ serviceInstances := s.serviceDiscovery.GetInstances(serviceName)
+ s.subscribe(url, notify, serviceName, serviceInstances)
+ listener := ®istry.ServiceInstancesChangedListener{
+ ServiceName: serviceName,
+ ChangedNotify: &InstanceChangeNotify{
+ notify: notify,
+ serviceDiscoveryRegistry: s,
+ },
+ }
+ s.registerServiceInstancesChangedListener(*url, listener)
+ }
+ return nil
+}
+
+func (s *serviceDiscoveryRegistry) registerServiceInstancesChangedListener(url common.URL, listener *registry.ServiceInstancesChangedListener) {
+ listenerId := listener.ServiceName + ":" + getUrlKey(url)
+ if !s.subscribedServices.Contains(listenerId) {
+ err := s.serviceDiscovery.AddListener(listener)
+ if err != nil {
+ logger.Errorf("add listener[%s] catch error,url:%s err:%s", listenerId, url.String(), err.Error())
+ }
+ }
+
+}
+
+func getUrlKey(url common.URL) string {
+ var bf bytes.Buffer
+ if len(url.Protocol) != 0 {
+ bf.WriteString(url.Protocol)
+ bf.WriteString("://")
+ }
+ if len(url.Location) != 0 {
+ bf.WriteString(url.Location)
+ bf.WriteString(":")
+ bf.WriteString(url.Port)
+ }
+ if len(url.Path) != 0 {
+ bf.WriteString("/")
+ bf.WriteString(url.Path)
+ }
+ bf.WriteString("?")
+ appendParam(bf, constant.VERSION_KEY, url)
+ appendParam(bf, constant.GROUP_KEY, url)
+ appendParam(bf, constant.NACOS_PROTOCOL_KEY, url)
+ return bf.String()
+}
+
+func appendParam(buffer bytes.Buffer, paramKey string, url common.URL) {
+ buffer.WriteString(paramKey)
+ buffer.WriteString("=")
+ buffer.WriteString(url.GetParam(paramKey, ""))
+}
+
+func (s *serviceDiscoveryRegistry) subscribe(url *common.URL, notify registry.NotifyListener,
+ serviceName string, serviceInstances []registry.ServiceInstance) {
+ if len(serviceInstances) == 0 {
+ logger.Warnf("here is no instance in service[name : %s]", serviceName)
+ return
+ }
+ var subscribedURLs []common.URL
+ subscribedURLs = append(subscribedURLs, s.getExportedUrls(*url, serviceInstances)...)
+ if len(subscribedURLs) == 0 {
+ subscribedURLs = s.synthesizeSubscribedURLs(url, serviceInstances)
+ }
+ for _, url := range subscribedURLs {
+ notify.Notify(®istry.ServiceEvent{
+ Action: remoting.EventTypeAdd,
+ Service: url,
+ })
+ }
+
+}
+
+func (s *serviceDiscoveryRegistry) synthesizeSubscribedURLs(subscribedURL *common.URL, serviceInstances []registry.ServiceInstance) []common.URL {
+ var urls []common.URL
+ for _, syn := range s.subscribedURLsSynthesizers {
+ if syn.Support(subscribedURL) {
+ urls = append(urls, syn.Synthesize(subscribedURL, serviceInstances)...)
+ }
+ }
+ return urls
+}
+
+func shouldSubscribe(url common.URL) bool {
+ return !shouldRegister(url)
+}
+
+func (s *serviceDiscoveryRegistry) getServices(url common.URL) *gxset.HashSet {
+ services := gxset.NewSet()
+ serviceNames := url.GetParam(constant.PROVIDER_BY, "")
+ if len(serviceNames) != 0 {
+ services = parseServices(serviceNames)
+ }
+ if services.Empty() {
+ services = s.findMappedServices(url)
+ if services.Empty() {
+ return s.subscribedServices
+ }
+ }
+ return services
+}
+
+func (s *serviceDiscoveryRegistry) findMappedServices(url common.URL) *gxset.HashSet {
+ serviceInterface := url.GetParam(constant.INTERFACE_KEY, url.Path)
+ group := url.GetParam(constant.GROUP_KEY, "")
+ version := url.GetParam(constant.VERSION_KEY, "")
+ protocol := url.Protocol
+ serviceNames, err := s.serviceNameMapping.Get(serviceInterface, group, version, protocol)
+ if err != nil {
+ logger.Errorf("get serviceInterface:[%s] group:[%s] version:[%s] protocol:[%s] from "+
+ "serviceNameMap error:%s", err.Error())
+ return gxset.NewSet()
+ }
+ return serviceNames
+}
+
+func (s *serviceDiscoveryRegistry) getExportedUrls(subscribedURL common.URL, serviceInstances []registry.ServiceInstance) []common.URL {
+ var filterInstances []registry.ServiceInstance
+ for _, s := range serviceInstances {
+ if !s.IsEnable() || !s.IsHealthy() {
+ continue
+ }
+ metaData := s.GetMetadata()
+ _, ok1 := metaData[constant.METADATA_SERVICE_URL_PARAMS_PROPERTY_NAME]
+ _, ok2 := metaData[constant.METADATA_SERVICE_URLS_PROPERTY_NAME]
+ if !ok1 && !ok2 {
+ continue
+ }
+ filterInstances = append(filterInstances, s)
+ }
+ if len(filterInstances) == 0 {
+ return []common.URL{}
+ }
+ s.prepareServiceRevisionExportedURLs(filterInstances)
+ subscribedURLs := s.cloneExportedURLs(subscribedURL, filterInstances)
+ return subscribedURLs
+}
+
+// comparator is defined as Comparator for skip list to compare the URL
+type comparator common.URL
+
+// Compare is defined as Comparator for skip list to compare the URL
+func (c comparator) Compare(comp cm.Comparator) int {
+ a := common.URL(c).String()
+ b := common.URL(comp.(comparator)).String()
+ switch {
+ case a > b:
+ return 1
+ case a < b:
+ return -1
+ default:
+ return 0
+ }
+}
+
+func (s *serviceDiscoveryRegistry) getExportedUrlsByInst(serviceInstance registry.ServiceInstance) []common.URL {
+ var urls []common.URL
+ metadataStorageType := getExportedStoreType(serviceInstance)
+ proxyFactory := extension.GetMetadataServiceProxyFactory(metadataStorageType)
+ if proxyFactory == nil {
+ return urls
+ }
+ metadataService := proxyFactory.GetProxy(serviceInstance)
+ if metadataService == nil {
+ return urls
+ }
+ result, err := metadataService.GetExportedURLs(constant.ANY_VALUE, constant.ANY_VALUE, constant.ANY_VALUE, constant.ANY_VALUE)
+ if err != nil {
+ logger.Errorf("get exported urls catch error:%s,instance:%+v", err.Error(), serviceInstance)
+ return urls
+ }
+
+ ret := make([]common.URL, 0, len(result))
+ for _, ui := range result {
+
+ u, err := common.NewURL(ui.(string))
+
+ if err != nil {
+ logger.Errorf("could not parse the url string to URL structure: %s", ui.(string), err)
+ continue
+ }
+ ret = append(ret, u)
+ }
+ return ret
+}
+
+func (s *serviceDiscoveryRegistry) prepareServiceRevisionExportedURLs(serviceInstances []registry.ServiceInstance) {
+ s.lock.Lock()
+ // 1. expunge stale
+ s.expungeStaleRevisionExportedURLs(serviceInstances)
+ // 2. Initialize
+ s.initRevisionExportedURLs(serviceInstances)
+ s.lock.Unlock()
+}
+
+func (s *serviceDiscoveryRegistry) expungeStaleRevisionExportedURLs(serviceInstances []registry.ServiceInstance) {
+ serviceName := serviceInstances[0].GetServiceName()
+ revisionExportedURLsMap, exist := s.serviceRevisionExportedURLsCache[serviceName]
+ if !exist {
+ return
+ }
+ existRevision := gxset.NewSet()
+ for k := range revisionExportedURLsMap {
+ existRevision.Add(k)
+ }
+ currentRevision := gxset.NewSet()
+ for _, s := range serviceInstances {
+ rv := getExportedServicesRevision(s)
+ if len(rv) != 0 {
+ currentRevision.Add(rv)
+ }
+ }
+ // staleRevisions = existedRevisions(copy) - currentRevisions
+ staleRevision := gxset.NewSet(existRevision.Values()...)
+ staleRevision.Remove(currentRevision.Values()...)
+ // remove exported URLs if staled
+ for _, s := range staleRevision.Values() {
+ delete(revisionExportedURLsMap, s.(string))
+ }
+}
+
+func (s *serviceDiscoveryRegistry) initRevisionExportedURLs(serviceInstances []registry.ServiceInstance) {
+ // initialize the revision exported URLs that the selected service instance exported
+ s.initSelectedRevisionExportedURLs(serviceInstances)
+ // initialize the revision exported URLs that other service instances exported
+ for _, serviceInstance := range serviceInstances {
+ s.initRevisionExportedURLsByInst(serviceInstance)
+ }
+}
+
+func (s *serviceDiscoveryRegistry) initSelectedRevisionExportedURLs(serviceInstances []registry.ServiceInstance) {
+ for range serviceInstances {
+ selectServiceInstance := s.selectServiceInstance(serviceInstances)
+ revisionExportedURLs := s.initRevisionExportedURLsByInst(selectServiceInstance)
+ if len(revisionExportedURLs) != 0 {
+ // If the result is valid,break
+ break
+ }
+ }
+}
+
+func (s *serviceDiscoveryRegistry) selectServiceInstance(serviceInstances []registry.ServiceInstance) registry.ServiceInstance {
+ size := len(serviceInstances)
+ if size == 0 {
+ return nil
+ }
+ if size == 1 {
+ return serviceInstances[0]
+ }
+ selectorName := s.url.GetParam(constant.SERVICE_INSTANCE_SELECTOR, "random")
+ selector, err := extension.GetServiceInstanceSelector(selectorName)
+ if err != nil {
+ logger.Errorf("get service instance selector cathe error:%s", err.Error())
+ return nil
+ }
+ return selector.Select(*s.url, serviceInstances)
+}
+
+func (s *serviceDiscoveryRegistry) initRevisionExportedURLsByInst(serviceInstance registry.ServiceInstance) []common.URL {
+ if serviceInstance == nil {
+ return []common.URL{}
+ }
+ serviceName := serviceInstance.GetServiceName()
+ revision := getExportedServicesRevision(serviceInstance)
+ revisionExportedURLsMap := s.serviceRevisionExportedURLsCache[serviceName]
+ if revisionExportedURLsMap == nil {
+ revisionExportedURLsMap = make(map[string][]common.URL, 4)
+ s.serviceRevisionExportedURLsCache[serviceName] = revisionExportedURLsMap
+ }
+ revisionExportedURLs := revisionExportedURLsMap[revision]
+ firstGet := false
+ if revisionExportedURLs == nil || len(revisionExportedURLs) == 0 {
+ if len(revisionExportedURLsMap) > 0 {
+ // The case is that current ServiceInstance with the different revision
+ logger.Warnf("The ServiceInstance[id: %s, host : %s , port : %s] has different revision : %s"+
+ ", please make sure the service [name : %s] is changing or not.", serviceInstance.GetId(),
+ serviceInstance.GetHost(), serviceInstance.GetPort(), revision, serviceInstance.GetServiceName())
+ } else {
+ firstGet = true
+ }
+ revisionExportedURLs = s.getExportedUrlsByInst(serviceInstance)
+ if revisionExportedURLs != nil {
+ revisionExportedURLsMap[revision] = revisionExportedURLs
+ logger.Debugf("Get the exported URLs[size : %s, first : %s] from the target service "+
+ "instance [id: %s , service : %s , host : %s , port : %s , revision : %s]",
+ len(revisionExportedURLs), firstGet, serviceInstance.GetId(), serviceInstance.GetServiceName(),
+ serviceInstance.GetHost(), serviceInstance.GetPort(), revision)
+ }
+ } else {
+ // Else, The cache is hit
+ logger.Debugf("Get the exported URLs[size : %s] from cache, the instance"+
+ "[id: %s , service : %s , host : %s , port : %s , revision : %s]", len(revisionExportedURLs), firstGet,
+ serviceInstance.GetId(), serviceInstance.GetServiceName(), serviceInstance.GetHost(),
+ serviceInstance.GetPort(), revision)
+ }
+ return revisionExportedURLs
+}
+
+func getExportedServicesRevision(serviceInstance registry.ServiceInstance) string {
+ metaData := serviceInstance.GetMetadata()
+ return metaData[constant.EXPORTED_SERVICES_REVISION_PROPERTY_NAME]
+}
+
+func getExportedStoreType(serviceInstance registry.ServiceInstance) string {
+ metaData := serviceInstance.GetMetadata()
+ result, ok := metaData[constant.METADATA_STORAGE_TYPE_PROPERTY_NAME]
+ if !ok {
+ return constant.DEFAULT_METADATA_STORAGE_TYPE
+ }
+ return result
+}
+
+func (s *serviceDiscoveryRegistry) cloneExportedURLs(url common.URL, serviceInsances []registry.ServiceInstance) []common.URL {
+ if serviceInsances == nil || len(serviceInsances) == 0 {
Review comment:
```suggestion
if len(serviceInsances) == 0 {
```
##########
File path: remoting/zookeeper/curator_discovery/service_discovery.go
##########
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package curator_discovery
+
+import (
+ "encoding/json"
+ "path"
+ "strings"
+ "sync"
+)
+
+import (
+ perrors "github.com/pkg/errors"
+)
+
+import (
+ "github.com/dubbogo/go-zookeeper/zk"
Review comment:
- -?
##########
File path: registry/servicediscovery/service_discovery_registry.go
##########
@@ -0,0 +1,713 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package servicediscovery
+
+import (
+ "bytes"
+ "encoding/json"
+ "strconv"
+ "strings"
+ "sync"
+)
+
+import (
+ cm "github.com/Workiva/go-datastructures/common"
+ gxset "github.com/dubbogo/gost/container/set"
+ gxnet "github.com/dubbogo/gost/net"
+ perrors "github.com/pkg/errors"
+ "go.uber.org/atomic"
+)
+
+import (
+ "github.com/apache/dubbo-go/common"
+ "github.com/apache/dubbo-go/common/constant"
+ "github.com/apache/dubbo-go/common/extension"
+ "github.com/apache/dubbo-go/common/logger"
+ "github.com/apache/dubbo-go/common/observer"
+ "github.com/apache/dubbo-go/config"
+ "github.com/apache/dubbo-go/metadata/mapping"
+ "github.com/apache/dubbo-go/metadata/service"
+ "github.com/apache/dubbo-go/metadata/service/exporter/configurable"
+ "github.com/apache/dubbo-go/registry"
+ "github.com/apache/dubbo-go/registry/event"
+ "github.com/apache/dubbo-go/registry/servicediscovery/synthesizer"
+ "github.com/apache/dubbo-go/remoting"
+)
+
+const (
+ protocolName = "service-discovery"
+)
+
+func init() {
+ extension.SetRegistry(protocolName, newServiceDiscoveryRegistry)
+}
+
+// serviceDiscoveryRegistry is the implementation of application-level registry.
+// It's completely different from other registry implementations
+// This implementation is based on ServiceDiscovery abstraction and ServiceNameMapping
+// In order to keep compatible with interface-level registry,
+// this implementation is
+type serviceDiscoveryRegistry struct {
+ lock sync.RWMutex
+ url *common.URL
+ serviceDiscovery registry.ServiceDiscovery
+ subscribedServices *gxset.HashSet
+ serviceNameMapping mapping.ServiceNameMapping
+ metaDataService service.MetadataService
+ registeredListeners *gxset.HashSet
+ subscribedURLsSynthesizers []synthesizer.SubscribedURLsSynthesizer
+ serviceRevisionExportedURLsCache map[string]map[string][]common.URL
+}
+
+func newServiceDiscoveryRegistry(url *common.URL) (registry.Registry, error) {
+
+ tryInitMetadataService()
+
+ serviceDiscovery, err := creatServiceDiscovery(url)
+ if err != nil {
+ return nil, err
+ }
+ subscribedServices := parseServices(url.GetParam(constant.SUBSCRIBED_SERVICE_NAMES_KEY, ""))
+ subscribedURLsSynthesizers := synthesizer.GetAllSynthesizer()
+ serviceNameMapping := extension.GetGlobalServiceNameMapping()
+ metaDataService, err := extension.GetMetadataService(config.GetApplicationConfig().MetadataType)
+ if err != nil {
+ return nil, perrors.WithMessage(err, "could not init metadata service")
+ }
+ return &serviceDiscoveryRegistry{
+ url: url,
+ serviceDiscovery: serviceDiscovery,
+ subscribedServices: subscribedServices,
+ subscribedURLsSynthesizers: subscribedURLsSynthesizers,
+ registeredListeners: gxset.NewSet(),
+ serviceRevisionExportedURLsCache: make(map[string]map[string][]common.URL),
+ serviceNameMapping: serviceNameMapping,
+ metaDataService: metaDataService,
+ }, nil
+}
+
+func (s *serviceDiscoveryRegistry) UnRegister(url common.URL) error {
+ if !shouldRegister(url) {
+ return nil
+ }
+ return s.metaDataService.UnexportURL(url)
+}
+
+func (s *serviceDiscoveryRegistry) UnSubscribe(url *common.URL, listener registry.NotifyListener) error {
+ if !shouldSubscribe(*url) {
+ return nil
+ }
+ return s.metaDataService.UnsubscribeURL(*url)
+}
+
+func creatServiceDiscovery(url *common.URL) (registry.ServiceDiscovery, error) {
+ sdcName := url.GetParam(constant.SERVICE_DISCOVERY_KEY, "")
+ sdc, ok := config.GetBaseConfig().GetServiceDiscoveries(sdcName)
+ if !ok {
+ return nil, perrors.Errorf("The service discovery with name: %s is not found", sdcName)
+ }
+ originServiceDiscovery, err := extension.GetServiceDiscovery(sdc.Protocol, sdcName)
+ if err != nil {
+ return nil, perrors.WithMessage(err, "Create service discovery fialed")
+ }
+ return event.NewEventPublishingServiceDiscovery(originServiceDiscovery), nil
+}
+
+func parseServices(literalServices string) *gxset.HashSet {
+ set := gxset.NewSet()
+ if len(literalServices) == 0 {
+ return set
+ }
+ var splitServices = strings.Split(literalServices, ",")
+ for _, s := range splitServices {
+ if len(s) != 0 {
+ set.Add(s)
+ }
+ }
+ return set
+}
+
+func (s *serviceDiscoveryRegistry) GetServiceDiscovery() registry.ServiceDiscovery {
+ return s.serviceDiscovery
+}
+
+func (s *serviceDiscoveryRegistry) GetUrl() common.URL {
+ return *s.url
+}
+
+func (s *serviceDiscoveryRegistry) IsAvailable() bool {
+ // TODO(whether available depends on metadata service and service discovery)
+ return true
+}
+
+func (s *serviceDiscoveryRegistry) Destroy() {
+ err := s.serviceDiscovery.Destroy()
+ if err != nil {
+ logger.Errorf("destroy serviceDiscovery catch error:%s", err.Error())
+ }
+}
+
+func (s *serviceDiscoveryRegistry) Register(url common.URL) error {
+ if !shouldRegister(url) {
+ return nil
+ }
+ ok, err := s.metaDataService.ExportURL(url)
+
+ if err != nil {
+ logger.Errorf("The URL[%s] registry catch error:%s!", url.String(), err.Error())
+ return err
+ }
+ if !ok {
+ logger.Warnf("The URL[%s] has been registry!", url.String())
+ }
+
+ // we try to register this instance. Dubbo do this in org.apache.dubbo.config.bootstrap.DubboBootstrap
+ // But we don't want to design a similar bootstrap class.
+ ins, err := createInstance(url)
+ if err != nil {
+ return perrors.WithMessage(err, "could not create servcie instance, please check your service url")
+ }
+
+ err = s.serviceDiscovery.Register(ins)
+ if err != nil {
+ return perrors.WithMessage(err, "register the service failed")
+ }
+
+ err = s.metaDataService.PublishServiceDefinition(url)
+ if err != nil {
+ return perrors.WithMessage(err, "publish the service definition failed. ")
+ }
+ return s.serviceNameMapping.Map(url.GetParam(constant.INTERFACE_KEY, ""),
+ url.GetParam(constant.GROUP_KEY, ""),
+ url.GetParam(constant.Version, ""),
+ url.Protocol)
+}
+
+func createInstance(url common.URL) (registry.ServiceInstance, error) {
+ appConfig := config.GetApplicationConfig()
+ port, err := strconv.ParseInt(url.Port, 10, 32)
+ if err != nil {
+ return nil, perrors.WithMessage(err, "invalid port: "+url.Port)
+ }
+
+ host := url.Ip
+ if len(host) == 0 {
+ host, err = gxnet.GetLocalIP()
+ if err != nil {
+ return nil, perrors.WithMessage(err, "could not get the local Ip")
+ }
+ }
+
+ // usually we will add more metadata
+ metadata := make(map[string]string, 8)
+ metadata[constant.METADATA_STORAGE_TYPE_PROPERTY_NAME] = appConfig.MetadataType
+
+ return ®istry.DefaultServiceInstance{
+ ServiceName: appConfig.Name,
+ Host: host,
+ Port: int(port),
+ Id: host + constant.KEY_SEPARATOR + url.Port,
+ Enable: true,
+ Healthy: true,
+ Metadata: metadata,
+ }, nil
+}
+
+func shouldRegister(url common.URL) bool {
+ side := url.GetParam(constant.SIDE_KEY, "")
+ if side == constant.PROVIDER_PROTOCOL {
+ return true
+ }
+ logger.Debugf("The URL should not be register.", url.String())
+ return false
+}
+
+func (s *serviceDiscoveryRegistry) Subscribe(url *common.URL, notify registry.NotifyListener) error {
+ if !shouldSubscribe(*url) {
+ return nil
+ }
+ _, err := s.metaDataService.SubscribeURL(*url)
+ if err != nil {
+ return perrors.WithMessage(err, "subscribe url error: "+url.String())
+ }
+ services := s.getServices(*url)
+ if services.Empty() {
+ return perrors.Errorf("Should has at least one way to know which services this interface belongs to, "+
+ "subscription url:%s", url.String())
+ }
+ for _, srv := range services.Values() {
+ serviceName := srv.(string)
+ serviceInstances := s.serviceDiscovery.GetInstances(serviceName)
+ s.subscribe(url, notify, serviceName, serviceInstances)
+ listener := ®istry.ServiceInstancesChangedListener{
+ ServiceName: serviceName,
+ ChangedNotify: &InstanceChangeNotify{
+ notify: notify,
+ serviceDiscoveryRegistry: s,
+ },
+ }
+ s.registerServiceInstancesChangedListener(*url, listener)
+ }
+
+ return nil
+}
Review comment:
where? He has already added a blank line between `Subscribe ` and `registerServiceInstancesChangedListener `
##########
File path: registry/servicediscovery/service_discovery_registry.go
##########
@@ -0,0 +1,713 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package servicediscovery
+
+import (
+ "bytes"
+ "encoding/json"
+ "strconv"
+ "strings"
+ "sync"
+)
+
+import (
+ cm "github.com/Workiva/go-datastructures/common"
+ gxset "github.com/dubbogo/gost/container/set"
+ gxnet "github.com/dubbogo/gost/net"
+ perrors "github.com/pkg/errors"
+ "go.uber.org/atomic"
+)
+
+import (
+ "github.com/apache/dubbo-go/common"
+ "github.com/apache/dubbo-go/common/constant"
+ "github.com/apache/dubbo-go/common/extension"
+ "github.com/apache/dubbo-go/common/logger"
+ "github.com/apache/dubbo-go/common/observer"
+ "github.com/apache/dubbo-go/config"
+ "github.com/apache/dubbo-go/metadata/mapping"
+ "github.com/apache/dubbo-go/metadata/service"
+ "github.com/apache/dubbo-go/metadata/service/exporter/configurable"
+ "github.com/apache/dubbo-go/registry"
+ "github.com/apache/dubbo-go/registry/event"
+ "github.com/apache/dubbo-go/registry/servicediscovery/synthesizer"
+ "github.com/apache/dubbo-go/remoting"
+)
+
+const (
+ protocolName = "service-discovery"
+)
+
+func init() {
+ extension.SetRegistry(protocolName, newServiceDiscoveryRegistry)
+}
+
+// serviceDiscoveryRegistry is the implementation of application-level registry.
+// It's completely different from other registry implementations
+// This implementation is based on ServiceDiscovery abstraction and ServiceNameMapping
+// In order to keep compatible with interface-level registry,
+// this implementation is
+type serviceDiscoveryRegistry struct {
+ lock sync.RWMutex
+ url *common.URL
+ serviceDiscovery registry.ServiceDiscovery
+ subscribedServices *gxset.HashSet
+ serviceNameMapping mapping.ServiceNameMapping
+ metaDataService service.MetadataService
+ registeredListeners *gxset.HashSet
+ subscribedURLsSynthesizers []synthesizer.SubscribedURLsSynthesizer
+ serviceRevisionExportedURLsCache map[string]map[string][]common.URL
+}
+
+func newServiceDiscoveryRegistry(url *common.URL) (registry.Registry, error) {
+
+ tryInitMetadataService()
+
+ serviceDiscovery, err := creatServiceDiscovery(url)
+ if err != nil {
+ return nil, err
+ }
+ subscribedServices := parseServices(url.GetParam(constant.SUBSCRIBED_SERVICE_NAMES_KEY, ""))
+ subscribedURLsSynthesizers := synthesizer.GetAllSynthesizer()
+ serviceNameMapping := extension.GetGlobalServiceNameMapping()
+ metaDataService, err := extension.GetMetadataService(config.GetApplicationConfig().MetadataType)
+ if err != nil {
+ return nil, perrors.WithMessage(err, "could not init metadata service")
+ }
+ return &serviceDiscoveryRegistry{
+ url: url,
+ serviceDiscovery: serviceDiscovery,
+ subscribedServices: subscribedServices,
+ subscribedURLsSynthesizers: subscribedURLsSynthesizers,
+ registeredListeners: gxset.NewSet(),
+ serviceRevisionExportedURLsCache: make(map[string]map[string][]common.URL, 8),
+ serviceNameMapping: serviceNameMapping,
+ metaDataService: metaDataService,
+ }, nil
+}
+
+func (s *serviceDiscoveryRegistry) UnRegister(url common.URL) error {
+ if !shouldRegister(url) {
+ return nil
+ }
+ return s.metaDataService.UnexportURL(url)
+}
+
+func (s *serviceDiscoveryRegistry) UnSubscribe(url *common.URL, listener registry.NotifyListener) error {
+ if !shouldSubscribe(*url) {
+ return nil
+ }
+ return s.metaDataService.UnsubscribeURL(*url)
+}
+
+func creatServiceDiscovery(url *common.URL) (registry.ServiceDiscovery, error) {
+ sdcName := url.GetParam(constant.SERVICE_DISCOVERY_KEY, "")
+ sdc, ok := config.GetBaseConfig().GetServiceDiscoveries(sdcName)
+ if !ok {
+ return nil, perrors.Errorf("The service discovery with name: %s is not found", sdcName)
+ }
+ originServiceDiscovery, err := extension.GetServiceDiscovery(sdc.Protocol, sdcName)
+ if err != nil {
+ return nil, perrors.WithMessage(err, "Create service discovery fialed")
+ }
+ return event.NewEventPublishingServiceDiscovery(originServiceDiscovery), nil
+}
+
+func parseServices(literalServices string) *gxset.HashSet {
+ set := gxset.NewSet()
+ if len(literalServices) == 0 {
+ return set
+ }
+ var splitServices = strings.Split(literalServices, ",")
+ for _, s := range splitServices {
+ if len(s) != 0 {
+ set.Add(s)
+ }
+ }
+ return set
+}
+
+func (s *serviceDiscoveryRegistry) GetServiceDiscovery() registry.ServiceDiscovery {
+ return s.serviceDiscovery
+}
+
+func (s *serviceDiscoveryRegistry) GetUrl() common.URL {
+ return *s.url
+}
+
+func (s *serviceDiscoveryRegistry) IsAvailable() bool {
+ // TODO(whether available depends on metadata service and service discovery)
+ return true
+}
+
+func (s *serviceDiscoveryRegistry) Destroy() {
+ err := s.serviceDiscovery.Destroy()
+ if err != nil {
+ logger.Errorf("destroy serviceDiscovery catch error:%s", err.Error())
+ }
+}
+
+func (s *serviceDiscoveryRegistry) Register(url common.URL) error {
+ if !shouldRegister(url) {
+ return nil
+ }
+ ok, err := s.metaDataService.ExportURL(url)
+
+ if err != nil {
+ logger.Errorf("The URL[%s] registry catch error:%s!", url.String(), err.Error())
+ return err
+ }
+ if !ok {
+ logger.Warnf("The URL[%s] has been registry!", url.String())
+ }
+
+ // we try to register this instance. Dubbo do this in org.apache.dubbo.config.bootstrap.DubboBootstrap
+ // But we don't want to design a similar bootstrap class.
+ ins, err := createInstance(url)
+ if err != nil {
+ return perrors.WithMessage(err, "could not create servcie instance, please check your service url")
+ }
+
+ err = s.serviceDiscovery.Register(ins)
+ if err != nil {
+ return perrors.WithMessage(err, "register the service failed")
+ }
+
+ err = s.metaDataService.PublishServiceDefinition(url)
+ if err != nil {
+ return perrors.WithMessage(err, "publish the service definition failed. ")
+ }
+ return s.serviceNameMapping.Map(url.GetParam(constant.INTERFACE_KEY, ""),
+ url.GetParam(constant.GROUP_KEY, ""),
+ url.GetParam(constant.Version, ""),
+ url.Protocol)
+}
+
+func createInstance(url common.URL) (registry.ServiceInstance, error) {
+ appConfig := config.GetApplicationConfig()
+ port, err := strconv.ParseInt(url.Port, 10, 32)
+ if err != nil {
+ return nil, perrors.WithMessage(err, "invalid port: "+url.Port)
+ }
+
+ host := url.Ip
+ if len(host) == 0 {
+ host, err = gxnet.GetLocalIP()
+ if err != nil {
+ return nil, perrors.WithMessage(err, "could not get the local Ip")
+ }
+ }
+
+ // usually we will add more metadata
+ metadata := make(map[string]string, 8)
+ metadata[constant.METADATA_STORAGE_TYPE_PROPERTY_NAME] = appConfig.MetadataType
+
+ return ®istry.DefaultServiceInstance{
+ ServiceName: appConfig.Name,
+ Host: host,
+ Port: int(port),
+ Id: host + constant.KEY_SEPARATOR + url.Port,
+ Enable: true,
+ Healthy: true,
+ Metadata: metadata,
+ }, nil
+}
+
+func shouldRegister(url common.URL) bool {
+ side := url.GetParam(constant.SIDE_KEY, "")
+ if side == constant.PROVIDER_PROTOCOL {
+ return true
+ }
+ logger.Debugf("The URL should not be register.", url.String())
+ return false
+}
+
+func (s *serviceDiscoveryRegistry) Subscribe(url *common.URL, notify registry.NotifyListener) error {
+ if !shouldSubscribe(*url) {
+ return nil
+ }
+ _, err := s.metaDataService.SubscribeURL(*url)
+ if err != nil {
+ return perrors.WithMessage(err, "subscribe url error: "+url.String())
+ }
+ services := s.getServices(*url)
+ if services.Empty() {
+ return perrors.Errorf("Should has at least one way to know which services this interface belongs to, "+
+ "subscription url:%s", url.String())
+ }
+ for _, srv := range services.Values() {
+ serviceName := srv.(string)
+ serviceInstances := s.serviceDiscovery.GetInstances(serviceName)
+ s.subscribe(url, notify, serviceName, serviceInstances)
+ listener := ®istry.ServiceInstancesChangedListener{
+ ServiceName: serviceName,
+ ChangedNotify: &InstanceChangeNotify{
+ notify: notify,
+ serviceDiscoveryRegistry: s,
+ },
+ }
+ s.registerServiceInstancesChangedListener(*url, listener)
+ }
+ return nil
+}
+
+func (s *serviceDiscoveryRegistry) registerServiceInstancesChangedListener(url common.URL, listener *registry.ServiceInstancesChangedListener) {
+ listenerId := listener.ServiceName + ":" + getUrlKey(url)
+ if !s.subscribedServices.Contains(listenerId) {
+ err := s.serviceDiscovery.AddListener(listener)
+ if err != nil {
+ logger.Errorf("add listener[%s] catch error,url:%s err:%s", listenerId, url.String(), err.Error())
+ }
+ }
+
+}
+
+func getUrlKey(url common.URL) string {
+ var bf bytes.Buffer
+ if len(url.Protocol) != 0 {
+ bf.WriteString(url.Protocol)
+ bf.WriteString("://")
+ }
+ if len(url.Location) != 0 {
+ bf.WriteString(url.Location)
+ bf.WriteString(":")
+ bf.WriteString(url.Port)
+ }
+ if len(url.Path) != 0 {
+ bf.WriteString("/")
+ bf.WriteString(url.Path)
+ }
+ bf.WriteString("?")
+ appendParam(bf, constant.VERSION_KEY, url)
+ appendParam(bf, constant.GROUP_KEY, url)
+ appendParam(bf, constant.NACOS_PROTOCOL_KEY, url)
+ return bf.String()
+}
+
+func appendParam(buffer bytes.Buffer, paramKey string, url common.URL) {
+ buffer.WriteString(paramKey)
+ buffer.WriteString("=")
+ buffer.WriteString(url.GetParam(paramKey, ""))
+}
+
+func (s *serviceDiscoveryRegistry) subscribe(url *common.URL, notify registry.NotifyListener,
+ serviceName string, serviceInstances []registry.ServiceInstance) {
+ if len(serviceInstances) == 0 {
+ logger.Warnf("here is no instance in service[name : %s]", serviceName)
+ return
+ }
+ var subscribedURLs []common.URL
+ subscribedURLs = append(subscribedURLs, s.getExportedUrls(*url, serviceInstances)...)
+ if len(subscribedURLs) == 0 {
+ subscribedURLs = s.synthesizeSubscribedURLs(url, serviceInstances)
+ }
+ for _, url := range subscribedURLs {
+ notify.Notify(®istry.ServiceEvent{
+ Action: remoting.EventTypeAdd,
+ Service: url,
+ })
+ }
+
+}
+
+func (s *serviceDiscoveryRegistry) synthesizeSubscribedURLs(subscribedURL *common.URL, serviceInstances []registry.ServiceInstance) []common.URL {
+ var urls []common.URL
+ for _, syn := range s.subscribedURLsSynthesizers {
+ if syn.Support(subscribedURL) {
+ urls = append(urls, syn.Synthesize(subscribedURL, serviceInstances)...)
+ }
+ }
+ return urls
+}
+
+func shouldSubscribe(url common.URL) bool {
+ return !shouldRegister(url)
+}
+
+func (s *serviceDiscoveryRegistry) getServices(url common.URL) *gxset.HashSet {
+ services := gxset.NewSet()
+ serviceNames := url.GetParam(constant.PROVIDER_BY, "")
+ if len(serviceNames) != 0 {
Review comment:
```suggestion
if len(serviceNames) > 0 {
```
?
##########
File path: registry/servicediscovery/service_discovery_registry.go
##########
@@ -0,0 +1,713 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package servicediscovery
+
+import (
+ "bytes"
+ "encoding/json"
+ "strconv"
+ "strings"
+ "sync"
+)
+
+import (
+ cm "github.com/Workiva/go-datastructures/common"
+ gxset "github.com/dubbogo/gost/container/set"
+ gxnet "github.com/dubbogo/gost/net"
+ perrors "github.com/pkg/errors"
+ "go.uber.org/atomic"
+)
+
+import (
+ "github.com/apache/dubbo-go/common"
+ "github.com/apache/dubbo-go/common/constant"
+ "github.com/apache/dubbo-go/common/extension"
+ "github.com/apache/dubbo-go/common/logger"
+ "github.com/apache/dubbo-go/common/observer"
+ "github.com/apache/dubbo-go/config"
+ "github.com/apache/dubbo-go/metadata/mapping"
+ "github.com/apache/dubbo-go/metadata/service"
+ "github.com/apache/dubbo-go/metadata/service/exporter/configurable"
+ "github.com/apache/dubbo-go/registry"
+ "github.com/apache/dubbo-go/registry/event"
+ "github.com/apache/dubbo-go/registry/servicediscovery/synthesizer"
+ "github.com/apache/dubbo-go/remoting"
+)
+
+const (
+ protocolName = "service-discovery"
+)
+
+func init() {
+ extension.SetRegistry(protocolName, newServiceDiscoveryRegistry)
+}
+
+// serviceDiscoveryRegistry is the implementation of application-level registry.
+// It's completely different from other registry implementations
+// This implementation is based on ServiceDiscovery abstraction and ServiceNameMapping
+// In order to keep compatible with interface-level registry,
+// this implementation is
+type serviceDiscoveryRegistry struct {
+ lock sync.RWMutex
+ url *common.URL
+ serviceDiscovery registry.ServiceDiscovery
+ subscribedServices *gxset.HashSet
+ serviceNameMapping mapping.ServiceNameMapping
+ metaDataService service.MetadataService
+ registeredListeners *gxset.HashSet
+ subscribedURLsSynthesizers []synthesizer.SubscribedURLsSynthesizer
+ serviceRevisionExportedURLsCache map[string]map[string][]common.URL
+}
+
+func newServiceDiscoveryRegistry(url *common.URL) (registry.Registry, error) {
+
+ tryInitMetadataService()
+
+ serviceDiscovery, err := creatServiceDiscovery(url)
+ if err != nil {
+ return nil, err
+ }
+ subscribedServices := parseServices(url.GetParam(constant.SUBSCRIBED_SERVICE_NAMES_KEY, ""))
+ subscribedURLsSynthesizers := synthesizer.GetAllSynthesizer()
+ serviceNameMapping := extension.GetGlobalServiceNameMapping()
+ metaDataService, err := extension.GetMetadataService(config.GetApplicationConfig().MetadataType)
+ if err != nil {
+ return nil, perrors.WithMessage(err, "could not init metadata service")
+ }
+ return &serviceDiscoveryRegistry{
+ url: url,
+ serviceDiscovery: serviceDiscovery,
+ subscribedServices: subscribedServices,
+ subscribedURLsSynthesizers: subscribedURLsSynthesizers,
+ registeredListeners: gxset.NewSet(),
+ serviceRevisionExportedURLsCache: make(map[string]map[string][]common.URL, 8),
+ serviceNameMapping: serviceNameMapping,
+ metaDataService: metaDataService,
+ }, nil
+}
+
+func (s *serviceDiscoveryRegistry) UnRegister(url common.URL) error {
+ if !shouldRegister(url) {
+ return nil
+ }
+ return s.metaDataService.UnexportURL(url)
+}
+
+func (s *serviceDiscoveryRegistry) UnSubscribe(url *common.URL, listener registry.NotifyListener) error {
+ if !shouldSubscribe(*url) {
+ return nil
+ }
+ return s.metaDataService.UnsubscribeURL(*url)
+}
+
+func creatServiceDiscovery(url *common.URL) (registry.ServiceDiscovery, error) {
+ sdcName := url.GetParam(constant.SERVICE_DISCOVERY_KEY, "")
+ sdc, ok := config.GetBaseConfig().GetServiceDiscoveries(sdcName)
+ if !ok {
+ return nil, perrors.Errorf("The service discovery with name: %s is not found", sdcName)
+ }
+ originServiceDiscovery, err := extension.GetServiceDiscovery(sdc.Protocol, sdcName)
+ if err != nil {
+ return nil, perrors.WithMessage(err, "Create service discovery fialed")
+ }
+ return event.NewEventPublishingServiceDiscovery(originServiceDiscovery), nil
+}
+
+func parseServices(literalServices string) *gxset.HashSet {
+ set := gxset.NewSet()
+ if len(literalServices) == 0 {
+ return set
+ }
+ var splitServices = strings.Split(literalServices, ",")
+ for _, s := range splitServices {
+ if len(s) != 0 {
+ set.Add(s)
+ }
+ }
+ return set
+}
+
+func (s *serviceDiscoveryRegistry) GetServiceDiscovery() registry.ServiceDiscovery {
+ return s.serviceDiscovery
+}
+
+func (s *serviceDiscoveryRegistry) GetUrl() common.URL {
+ return *s.url
+}
+
+func (s *serviceDiscoveryRegistry) IsAvailable() bool {
+ // TODO(whether available depends on metadata service and service discovery)
+ return true
+}
+
+func (s *serviceDiscoveryRegistry) Destroy() {
+ err := s.serviceDiscovery.Destroy()
+ if err != nil {
+ logger.Errorf("destroy serviceDiscovery catch error:%s", err.Error())
+ }
+}
+
+func (s *serviceDiscoveryRegistry) Register(url common.URL) error {
+ if !shouldRegister(url) {
+ return nil
+ }
+ ok, err := s.metaDataService.ExportURL(url)
+
+ if err != nil {
+ logger.Errorf("The URL[%s] registry catch error:%s!", url.String(), err.Error())
+ return err
+ }
+ if !ok {
+ logger.Warnf("The URL[%s] has been registry!", url.String())
+ }
+
+ // we try to register this instance. Dubbo do this in org.apache.dubbo.config.bootstrap.DubboBootstrap
+ // But we don't want to design a similar bootstrap class.
+ ins, err := createInstance(url)
+ if err != nil {
+ return perrors.WithMessage(err, "could not create servcie instance, please check your service url")
+ }
+
+ err = s.serviceDiscovery.Register(ins)
+ if err != nil {
+ return perrors.WithMessage(err, "register the service failed")
+ }
+
+ err = s.metaDataService.PublishServiceDefinition(url)
+ if err != nil {
+ return perrors.WithMessage(err, "publish the service definition failed. ")
+ }
+ return s.serviceNameMapping.Map(url.GetParam(constant.INTERFACE_KEY, ""),
+ url.GetParam(constant.GROUP_KEY, ""),
+ url.GetParam(constant.Version, ""),
+ url.Protocol)
+}
+
+func createInstance(url common.URL) (registry.ServiceInstance, error) {
+ appConfig := config.GetApplicationConfig()
+ port, err := strconv.ParseInt(url.Port, 10, 32)
+ if err != nil {
+ return nil, perrors.WithMessage(err, "invalid port: "+url.Port)
+ }
+
+ host := url.Ip
+ if len(host) == 0 {
+ host, err = gxnet.GetLocalIP()
+ if err != nil {
+ return nil, perrors.WithMessage(err, "could not get the local Ip")
+ }
+ }
+
+ // usually we will add more metadata
+ metadata := make(map[string]string, 8)
+ metadata[constant.METADATA_STORAGE_TYPE_PROPERTY_NAME] = appConfig.MetadataType
+
+ return ®istry.DefaultServiceInstance{
+ ServiceName: appConfig.Name,
+ Host: host,
+ Port: int(port),
+ Id: host + constant.KEY_SEPARATOR + url.Port,
+ Enable: true,
+ Healthy: true,
+ Metadata: metadata,
+ }, nil
+}
+
+func shouldRegister(url common.URL) bool {
+ side := url.GetParam(constant.SIDE_KEY, "")
+ if side == constant.PROVIDER_PROTOCOL {
+ return true
+ }
+ logger.Debugf("The URL should not be register.", url.String())
+ return false
+}
+
+func (s *serviceDiscoveryRegistry) Subscribe(url *common.URL, notify registry.NotifyListener) error {
+ if !shouldSubscribe(*url) {
+ return nil
+ }
+ _, err := s.metaDataService.SubscribeURL(*url)
+ if err != nil {
+ return perrors.WithMessage(err, "subscribe url error: "+url.String())
+ }
+ services := s.getServices(*url)
+ if services.Empty() {
+ return perrors.Errorf("Should has at least one way to know which services this interface belongs to, "+
+ "subscription url:%s", url.String())
+ }
+ for _, srv := range services.Values() {
+ serviceName := srv.(string)
+ serviceInstances := s.serviceDiscovery.GetInstances(serviceName)
+ s.subscribe(url, notify, serviceName, serviceInstances)
+ listener := ®istry.ServiceInstancesChangedListener{
+ ServiceName: serviceName,
+ ChangedNotify: &InstanceChangeNotify{
+ notify: notify,
+ serviceDiscoveryRegistry: s,
+ },
+ }
+ s.registerServiceInstancesChangedListener(*url, listener)
+ }
+ return nil
+}
+
+func (s *serviceDiscoveryRegistry) registerServiceInstancesChangedListener(url common.URL, listener *registry.ServiceInstancesChangedListener) {
+ listenerId := listener.ServiceName + ":" + getUrlKey(url)
+ if !s.subscribedServices.Contains(listenerId) {
+ err := s.serviceDiscovery.AddListener(listener)
+ if err != nil {
+ logger.Errorf("add listener[%s] catch error,url:%s err:%s", listenerId, url.String(), err.Error())
+ }
+ }
+
+}
+
+func getUrlKey(url common.URL) string {
+ var bf bytes.Buffer
+ if len(url.Protocol) != 0 {
+ bf.WriteString(url.Protocol)
+ bf.WriteString("://")
+ }
+ if len(url.Location) != 0 {
+ bf.WriteString(url.Location)
+ bf.WriteString(":")
+ bf.WriteString(url.Port)
+ }
+ if len(url.Path) != 0 {
+ bf.WriteString("/")
+ bf.WriteString(url.Path)
+ }
+ bf.WriteString("?")
+ appendParam(bf, constant.VERSION_KEY, url)
+ appendParam(bf, constant.GROUP_KEY, url)
+ appendParam(bf, constant.NACOS_PROTOCOL_KEY, url)
+ return bf.String()
+}
+
+func appendParam(buffer bytes.Buffer, paramKey string, url common.URL) {
+ buffer.WriteString(paramKey)
+ buffer.WriteString("=")
+ buffer.WriteString(url.GetParam(paramKey, ""))
+}
+
+func (s *serviceDiscoveryRegistry) subscribe(url *common.URL, notify registry.NotifyListener,
+ serviceName string, serviceInstances []registry.ServiceInstance) {
+ if len(serviceInstances) == 0 {
+ logger.Warnf("here is no instance in service[name : %s]", serviceName)
+ return
+ }
+ var subscribedURLs []common.URL
+ subscribedURLs = append(subscribedURLs, s.getExportedUrls(*url, serviceInstances)...)
+ if len(subscribedURLs) == 0 {
+ subscribedURLs = s.synthesizeSubscribedURLs(url, serviceInstances)
+ }
+ for _, url := range subscribedURLs {
+ notify.Notify(®istry.ServiceEvent{
+ Action: remoting.EventTypeAdd,
+ Service: url,
+ })
+ }
+
+}
+
+func (s *serviceDiscoveryRegistry) synthesizeSubscribedURLs(subscribedURL *common.URL, serviceInstances []registry.ServiceInstance) []common.URL {
+ var urls []common.URL
+ for _, syn := range s.subscribedURLsSynthesizers {
+ if syn.Support(subscribedURL) {
+ urls = append(urls, syn.Synthesize(subscribedURL, serviceInstances)...)
+ }
+ }
+ return urls
+}
+
+func shouldSubscribe(url common.URL) bool {
+ return !shouldRegister(url)
+}
+
+func (s *serviceDiscoveryRegistry) getServices(url common.URL) *gxset.HashSet {
+ services := gxset.NewSet()
+ serviceNames := url.GetParam(constant.PROVIDER_BY, "")
+ if len(serviceNames) != 0 {
+ services = parseServices(serviceNames)
+ }
+ if services.Empty() {
+ services = s.findMappedServices(url)
+ if services.Empty() {
+ return s.subscribedServices
+ }
+ }
+ return services
+}
+
+func (s *serviceDiscoveryRegistry) findMappedServices(url common.URL) *gxset.HashSet {
+ serviceInterface := url.GetParam(constant.INTERFACE_KEY, url.Path)
+ group := url.GetParam(constant.GROUP_KEY, "")
+ version := url.GetParam(constant.VERSION_KEY, "")
+ protocol := url.Protocol
+ serviceNames, err := s.serviceNameMapping.Get(serviceInterface, group, version, protocol)
+ if err != nil {
+ logger.Errorf("get serviceInterface:[%s] group:[%s] version:[%s] protocol:[%s] from "+
+ "serviceNameMap error:%s", err.Error())
+ return gxset.NewSet()
+ }
+ return serviceNames
+}
+
+func (s *serviceDiscoveryRegistry) getExportedUrls(subscribedURL common.URL, serviceInstances []registry.ServiceInstance) []common.URL {
+ var filterInstances []registry.ServiceInstance
+ for _, s := range serviceInstances {
+ if !s.IsEnable() || !s.IsHealthy() {
+ continue
+ }
+ metaData := s.GetMetadata()
+ _, ok1 := metaData[constant.METADATA_SERVICE_URL_PARAMS_PROPERTY_NAME]
+ _, ok2 := metaData[constant.METADATA_SERVICE_URLS_PROPERTY_NAME]
+ if !ok1 && !ok2 {
+ continue
+ }
+ filterInstances = append(filterInstances, s)
+ }
+ if len(filterInstances) == 0 {
+ return []common.URL{}
+ }
+ s.prepareServiceRevisionExportedURLs(filterInstances)
+ subscribedURLs := s.cloneExportedURLs(subscribedURL, filterInstances)
+ return subscribedURLs
+}
+
+// comparator is defined as Comparator for skip list to compare the URL
+type comparator common.URL
+
+// Compare is defined as Comparator for skip list to compare the URL
+func (c comparator) Compare(comp cm.Comparator) int {
+ a := common.URL(c).String()
+ b := common.URL(comp.(comparator)).String()
+ switch {
+ case a > b:
+ return 1
+ case a < b:
+ return -1
+ default:
+ return 0
+ }
+}
+
+func (s *serviceDiscoveryRegistry) getExportedUrlsByInst(serviceInstance registry.ServiceInstance) []common.URL {
+ var urls []common.URL
+ metadataStorageType := getExportedStoreType(serviceInstance)
+ proxyFactory := extension.GetMetadataServiceProxyFactory(metadataStorageType)
+ if proxyFactory == nil {
+ return urls
+ }
+ metadataService := proxyFactory.GetProxy(serviceInstance)
+ if metadataService == nil {
+ return urls
+ }
+ result, err := metadataService.GetExportedURLs(constant.ANY_VALUE, constant.ANY_VALUE, constant.ANY_VALUE, constant.ANY_VALUE)
+ if err != nil {
+ logger.Errorf("get exported urls catch error:%s,instance:%+v", err.Error(), serviceInstance)
+ return urls
+ }
+
+ ret := make([]common.URL, 0, len(result))
+ for _, ui := range result {
+
+ u, err := common.NewURL(ui.(string))
+
+ if err != nil {
+ logger.Errorf("could not parse the url string to URL structure: %s", ui.(string), err)
+ continue
+ }
+ ret = append(ret, u)
+ }
+ return ret
+}
+
+func (s *serviceDiscoveryRegistry) prepareServiceRevisionExportedURLs(serviceInstances []registry.ServiceInstance) {
+ s.lock.Lock()
+ // 1. expunge stale
+ s.expungeStaleRevisionExportedURLs(serviceInstances)
+ // 2. Initialize
+ s.initRevisionExportedURLs(serviceInstances)
+ s.lock.Unlock()
+}
+
+func (s *serviceDiscoveryRegistry) expungeStaleRevisionExportedURLs(serviceInstances []registry.ServiceInstance) {
+ serviceName := serviceInstances[0].GetServiceName()
+ revisionExportedURLsMap, exist := s.serviceRevisionExportedURLsCache[serviceName]
+ if !exist {
+ return
+ }
+ existRevision := gxset.NewSet()
+ for k := range revisionExportedURLsMap {
+ existRevision.Add(k)
+ }
+ currentRevision := gxset.NewSet()
+ for _, s := range serviceInstances {
+ rv := getExportedServicesRevision(s)
+ if len(rv) != 0 {
+ currentRevision.Add(rv)
+ }
+ }
+ // staleRevisions = existedRevisions(copy) - currentRevisions
+ staleRevision := gxset.NewSet(existRevision.Values()...)
+ staleRevision.Remove(currentRevision.Values()...)
+ // remove exported URLs if staled
+ for _, s := range staleRevision.Values() {
+ delete(revisionExportedURLsMap, s.(string))
+ }
+}
+
+func (s *serviceDiscoveryRegistry) initRevisionExportedURLs(serviceInstances []registry.ServiceInstance) {
+ // initialize the revision exported URLs that the selected service instance exported
+ s.initSelectedRevisionExportedURLs(serviceInstances)
+ // initialize the revision exported URLs that other service instances exported
+ for _, serviceInstance := range serviceInstances {
+ s.initRevisionExportedURLsByInst(serviceInstance)
+ }
+}
+
+func (s *serviceDiscoveryRegistry) initSelectedRevisionExportedURLs(serviceInstances []registry.ServiceInstance) {
+ for range serviceInstances {
+ selectServiceInstance := s.selectServiceInstance(serviceInstances)
+ revisionExportedURLs := s.initRevisionExportedURLsByInst(selectServiceInstance)
+ if len(revisionExportedURLs) != 0 {
Review comment:
```suggestion
if len(revisionExportedURLs) > 0 {
```
?
##########
File path: registry/servicediscovery/service_discovery_registry.go
##########
@@ -0,0 +1,713 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package servicediscovery
+
+import (
+ "bytes"
+ "encoding/json"
+ "strconv"
+ "strings"
+ "sync"
+)
+
+import (
+ cm "github.com/Workiva/go-datastructures/common"
+ gxset "github.com/dubbogo/gost/container/set"
+ gxnet "github.com/dubbogo/gost/net"
+ perrors "github.com/pkg/errors"
+ "go.uber.org/atomic"
+)
+
+import (
+ "github.com/apache/dubbo-go/common"
+ "github.com/apache/dubbo-go/common/constant"
+ "github.com/apache/dubbo-go/common/extension"
+ "github.com/apache/dubbo-go/common/logger"
+ "github.com/apache/dubbo-go/common/observer"
+ "github.com/apache/dubbo-go/config"
+ "github.com/apache/dubbo-go/metadata/mapping"
+ "github.com/apache/dubbo-go/metadata/service"
+ "github.com/apache/dubbo-go/metadata/service/exporter/configurable"
+ "github.com/apache/dubbo-go/registry"
+ "github.com/apache/dubbo-go/registry/event"
+ "github.com/apache/dubbo-go/registry/servicediscovery/synthesizer"
+ "github.com/apache/dubbo-go/remoting"
+)
+
+const (
+ protocolName = "service-discovery"
+)
+
+func init() {
+ extension.SetRegistry(protocolName, newServiceDiscoveryRegistry)
+}
+
+// serviceDiscoveryRegistry is the implementation of application-level registry.
+// It's completely different from other registry implementations
+// This implementation is based on ServiceDiscovery abstraction and ServiceNameMapping
+// In order to keep compatible with interface-level registry,
+// this implementation is
+type serviceDiscoveryRegistry struct {
+ lock sync.RWMutex
+ url *common.URL
+ serviceDiscovery registry.ServiceDiscovery
+ subscribedServices *gxset.HashSet
+ serviceNameMapping mapping.ServiceNameMapping
+ metaDataService service.MetadataService
+ registeredListeners *gxset.HashSet
+ subscribedURLsSynthesizers []synthesizer.SubscribedURLsSynthesizer
+ serviceRevisionExportedURLsCache map[string]map[string][]common.URL
+}
+
+func newServiceDiscoveryRegistry(url *common.URL) (registry.Registry, error) {
+
+ tryInitMetadataService()
+
+ serviceDiscovery, err := creatServiceDiscovery(url)
+ if err != nil {
+ return nil, err
+ }
+ subscribedServices := parseServices(url.GetParam(constant.SUBSCRIBED_SERVICE_NAMES_KEY, ""))
+ subscribedURLsSynthesizers := synthesizer.GetAllSynthesizer()
+ serviceNameMapping := extension.GetGlobalServiceNameMapping()
+ metaDataService, err := extension.GetMetadataService(config.GetApplicationConfig().MetadataType)
+ if err != nil {
+ return nil, perrors.WithMessage(err, "could not init metadata service")
+ }
+ return &serviceDiscoveryRegistry{
+ url: url,
+ serviceDiscovery: serviceDiscovery,
+ subscribedServices: subscribedServices,
+ subscribedURLsSynthesizers: subscribedURLsSynthesizers,
+ registeredListeners: gxset.NewSet(),
+ serviceRevisionExportedURLsCache: make(map[string]map[string][]common.URL, 8),
+ serviceNameMapping: serviceNameMapping,
+ metaDataService: metaDataService,
+ }, nil
+}
+
+func (s *serviceDiscoveryRegistry) UnRegister(url common.URL) error {
+ if !shouldRegister(url) {
+ return nil
+ }
+ return s.metaDataService.UnexportURL(url)
+}
+
+func (s *serviceDiscoveryRegistry) UnSubscribe(url *common.URL, listener registry.NotifyListener) error {
+ if !shouldSubscribe(*url) {
+ return nil
+ }
+ return s.metaDataService.UnsubscribeURL(*url)
+}
+
+func creatServiceDiscovery(url *common.URL) (registry.ServiceDiscovery, error) {
+ sdcName := url.GetParam(constant.SERVICE_DISCOVERY_KEY, "")
+ sdc, ok := config.GetBaseConfig().GetServiceDiscoveries(sdcName)
+ if !ok {
+ return nil, perrors.Errorf("The service discovery with name: %s is not found", sdcName)
+ }
+ originServiceDiscovery, err := extension.GetServiceDiscovery(sdc.Protocol, sdcName)
+ if err != nil {
+ return nil, perrors.WithMessage(err, "Create service discovery fialed")
+ }
+ return event.NewEventPublishingServiceDiscovery(originServiceDiscovery), nil
+}
+
+func parseServices(literalServices string) *gxset.HashSet {
+ set := gxset.NewSet()
+ if len(literalServices) == 0 {
+ return set
+ }
+ var splitServices = strings.Split(literalServices, ",")
+ for _, s := range splitServices {
+ if len(s) != 0 {
+ set.Add(s)
+ }
+ }
+ return set
+}
+
+func (s *serviceDiscoveryRegistry) GetServiceDiscovery() registry.ServiceDiscovery {
+ return s.serviceDiscovery
+}
+
+func (s *serviceDiscoveryRegistry) GetUrl() common.URL {
+ return *s.url
+}
+
+func (s *serviceDiscoveryRegistry) IsAvailable() bool {
+ // TODO(whether available depends on metadata service and service discovery)
+ return true
+}
+
+func (s *serviceDiscoveryRegistry) Destroy() {
+ err := s.serviceDiscovery.Destroy()
+ if err != nil {
+ logger.Errorf("destroy serviceDiscovery catch error:%s", err.Error())
+ }
+}
+
+func (s *serviceDiscoveryRegistry) Register(url common.URL) error {
+ if !shouldRegister(url) {
+ return nil
+ }
+ ok, err := s.metaDataService.ExportURL(url)
+
+ if err != nil {
+ logger.Errorf("The URL[%s] registry catch error:%s!", url.String(), err.Error())
+ return err
+ }
+ if !ok {
+ logger.Warnf("The URL[%s] has been registry!", url.String())
+ }
+
+ // we try to register this instance. Dubbo do this in org.apache.dubbo.config.bootstrap.DubboBootstrap
+ // But we don't want to design a similar bootstrap class.
+ ins, err := createInstance(url)
+ if err != nil {
+ return perrors.WithMessage(err, "could not create servcie instance, please check your service url")
+ }
+
+ err = s.serviceDiscovery.Register(ins)
+ if err != nil {
+ return perrors.WithMessage(err, "register the service failed")
+ }
+
+ err = s.metaDataService.PublishServiceDefinition(url)
+ if err != nil {
+ return perrors.WithMessage(err, "publish the service definition failed. ")
+ }
+ return s.serviceNameMapping.Map(url.GetParam(constant.INTERFACE_KEY, ""),
+ url.GetParam(constant.GROUP_KEY, ""),
+ url.GetParam(constant.Version, ""),
+ url.Protocol)
+}
+
+func createInstance(url common.URL) (registry.ServiceInstance, error) {
+ appConfig := config.GetApplicationConfig()
+ port, err := strconv.ParseInt(url.Port, 10, 32)
+ if err != nil {
+ return nil, perrors.WithMessage(err, "invalid port: "+url.Port)
+ }
+
+ host := url.Ip
+ if len(host) == 0 {
+ host, err = gxnet.GetLocalIP()
+ if err != nil {
+ return nil, perrors.WithMessage(err, "could not get the local Ip")
+ }
+ }
+
+ // usually we will add more metadata
+ metadata := make(map[string]string, 8)
+ metadata[constant.METADATA_STORAGE_TYPE_PROPERTY_NAME] = appConfig.MetadataType
+
+ return ®istry.DefaultServiceInstance{
+ ServiceName: appConfig.Name,
+ Host: host,
+ Port: int(port),
+ Id: host + constant.KEY_SEPARATOR + url.Port,
+ Enable: true,
+ Healthy: true,
+ Metadata: metadata,
+ }, nil
+}
+
+func shouldRegister(url common.URL) bool {
+ side := url.GetParam(constant.SIDE_KEY, "")
+ if side == constant.PROVIDER_PROTOCOL {
+ return true
+ }
+ logger.Debugf("The URL should not be register.", url.String())
+ return false
+}
+
+func (s *serviceDiscoveryRegistry) Subscribe(url *common.URL, notify registry.NotifyListener) error {
+ if !shouldSubscribe(*url) {
+ return nil
+ }
+ _, err := s.metaDataService.SubscribeURL(*url)
+ if err != nil {
+ return perrors.WithMessage(err, "subscribe url error: "+url.String())
+ }
+ services := s.getServices(*url)
+ if services.Empty() {
+ return perrors.Errorf("Should has at least one way to know which services this interface belongs to, "+
+ "subscription url:%s", url.String())
+ }
+ for _, srv := range services.Values() {
+ serviceName := srv.(string)
+ serviceInstances := s.serviceDiscovery.GetInstances(serviceName)
+ s.subscribe(url, notify, serviceName, serviceInstances)
+ listener := ®istry.ServiceInstancesChangedListener{
+ ServiceName: serviceName,
+ ChangedNotify: &InstanceChangeNotify{
+ notify: notify,
+ serviceDiscoveryRegistry: s,
+ },
+ }
+ s.registerServiceInstancesChangedListener(*url, listener)
+ }
+ return nil
+}
+
+func (s *serviceDiscoveryRegistry) registerServiceInstancesChangedListener(url common.URL, listener *registry.ServiceInstancesChangedListener) {
+ listenerId := listener.ServiceName + ":" + getUrlKey(url)
+ if !s.subscribedServices.Contains(listenerId) {
+ err := s.serviceDiscovery.AddListener(listener)
+ if err != nil {
+ logger.Errorf("add listener[%s] catch error,url:%s err:%s", listenerId, url.String(), err.Error())
+ }
+ }
+
+}
+
+func getUrlKey(url common.URL) string {
+ var bf bytes.Buffer
+ if len(url.Protocol) != 0 {
+ bf.WriteString(url.Protocol)
+ bf.WriteString("://")
+ }
+ if len(url.Location) != 0 {
+ bf.WriteString(url.Location)
+ bf.WriteString(":")
+ bf.WriteString(url.Port)
+ }
+ if len(url.Path) != 0 {
+ bf.WriteString("/")
+ bf.WriteString(url.Path)
+ }
+ bf.WriteString("?")
+ appendParam(bf, constant.VERSION_KEY, url)
+ appendParam(bf, constant.GROUP_KEY, url)
+ appendParam(bf, constant.NACOS_PROTOCOL_KEY, url)
+ return bf.String()
+}
+
+func appendParam(buffer bytes.Buffer, paramKey string, url common.URL) {
+ buffer.WriteString(paramKey)
+ buffer.WriteString("=")
+ buffer.WriteString(url.GetParam(paramKey, ""))
+}
+
+func (s *serviceDiscoveryRegistry) subscribe(url *common.URL, notify registry.NotifyListener,
+ serviceName string, serviceInstances []registry.ServiceInstance) {
+ if len(serviceInstances) == 0 {
+ logger.Warnf("here is no instance in service[name : %s]", serviceName)
+ return
+ }
+ var subscribedURLs []common.URL
+ subscribedURLs = append(subscribedURLs, s.getExportedUrls(*url, serviceInstances)...)
+ if len(subscribedURLs) == 0 {
+ subscribedURLs = s.synthesizeSubscribedURLs(url, serviceInstances)
+ }
+ for _, url := range subscribedURLs {
+ notify.Notify(®istry.ServiceEvent{
+ Action: remoting.EventTypeAdd,
+ Service: url,
+ })
+ }
+
+}
+
+func (s *serviceDiscoveryRegistry) synthesizeSubscribedURLs(subscribedURL *common.URL, serviceInstances []registry.ServiceInstance) []common.URL {
+ var urls []common.URL
+ for _, syn := range s.subscribedURLsSynthesizers {
+ if syn.Support(subscribedURL) {
+ urls = append(urls, syn.Synthesize(subscribedURL, serviceInstances)...)
+ }
+ }
+ return urls
+}
+
+func shouldSubscribe(url common.URL) bool {
+ return !shouldRegister(url)
+}
+
+func (s *serviceDiscoveryRegistry) getServices(url common.URL) *gxset.HashSet {
+ services := gxset.NewSet()
+ serviceNames := url.GetParam(constant.PROVIDER_BY, "")
+ if len(serviceNames) != 0 {
+ services = parseServices(serviceNames)
+ }
+ if services.Empty() {
+ services = s.findMappedServices(url)
+ if services.Empty() {
+ return s.subscribedServices
+ }
+ }
+ return services
+}
+
+func (s *serviceDiscoveryRegistry) findMappedServices(url common.URL) *gxset.HashSet {
+ serviceInterface := url.GetParam(constant.INTERFACE_KEY, url.Path)
+ group := url.GetParam(constant.GROUP_KEY, "")
+ version := url.GetParam(constant.VERSION_KEY, "")
+ protocol := url.Protocol
+ serviceNames, err := s.serviceNameMapping.Get(serviceInterface, group, version, protocol)
+ if err != nil {
+ logger.Errorf("get serviceInterface:[%s] group:[%s] version:[%s] protocol:[%s] from "+
+ "serviceNameMap error:%s", err.Error())
+ return gxset.NewSet()
+ }
+ return serviceNames
+}
+
+func (s *serviceDiscoveryRegistry) getExportedUrls(subscribedURL common.URL, serviceInstances []registry.ServiceInstance) []common.URL {
+ var filterInstances []registry.ServiceInstance
+ for _, s := range serviceInstances {
+ if !s.IsEnable() || !s.IsHealthy() {
+ continue
+ }
+ metaData := s.GetMetadata()
+ _, ok1 := metaData[constant.METADATA_SERVICE_URL_PARAMS_PROPERTY_NAME]
+ _, ok2 := metaData[constant.METADATA_SERVICE_URLS_PROPERTY_NAME]
+ if !ok1 && !ok2 {
+ continue
+ }
+ filterInstances = append(filterInstances, s)
+ }
+ if len(filterInstances) == 0 {
+ return []common.URL{}
+ }
+ s.prepareServiceRevisionExportedURLs(filterInstances)
+ subscribedURLs := s.cloneExportedURLs(subscribedURL, filterInstances)
+ return subscribedURLs
+}
+
+// comparator is defined as Comparator for skip list to compare the URL
+type comparator common.URL
+
+// Compare is defined as Comparator for skip list to compare the URL
+func (c comparator) Compare(comp cm.Comparator) int {
+ a := common.URL(c).String()
+ b := common.URL(comp.(comparator)).String()
+ switch {
+ case a > b:
+ return 1
+ case a < b:
+ return -1
+ default:
+ return 0
+ }
+}
+
+func (s *serviceDiscoveryRegistry) getExportedUrlsByInst(serviceInstance registry.ServiceInstance) []common.URL {
+ var urls []common.URL
+ metadataStorageType := getExportedStoreType(serviceInstance)
+ proxyFactory := extension.GetMetadataServiceProxyFactory(metadataStorageType)
+ if proxyFactory == nil {
+ return urls
+ }
+ metadataService := proxyFactory.GetProxy(serviceInstance)
+ if metadataService == nil {
+ return urls
+ }
+ result, err := metadataService.GetExportedURLs(constant.ANY_VALUE, constant.ANY_VALUE, constant.ANY_VALUE, constant.ANY_VALUE)
+ if err != nil {
+ logger.Errorf("get exported urls catch error:%s,instance:%+v", err.Error(), serviceInstance)
+ return urls
+ }
+
+ ret := make([]common.URL, 0, len(result))
+ for _, ui := range result {
+
+ u, err := common.NewURL(ui.(string))
+
+ if err != nil {
+ logger.Errorf("could not parse the url string to URL structure: %s", ui.(string), err)
+ continue
+ }
+ ret = append(ret, u)
+ }
+ return ret
+}
+
+func (s *serviceDiscoveryRegistry) prepareServiceRevisionExportedURLs(serviceInstances []registry.ServiceInstance) {
+ s.lock.Lock()
+ // 1. expunge stale
+ s.expungeStaleRevisionExportedURLs(serviceInstances)
+ // 2. Initialize
+ s.initRevisionExportedURLs(serviceInstances)
+ s.lock.Unlock()
+}
+
+func (s *serviceDiscoveryRegistry) expungeStaleRevisionExportedURLs(serviceInstances []registry.ServiceInstance) {
+ serviceName := serviceInstances[0].GetServiceName()
+ revisionExportedURLsMap, exist := s.serviceRevisionExportedURLsCache[serviceName]
+ if !exist {
+ return
+ }
+ existRevision := gxset.NewSet()
+ for k := range revisionExportedURLsMap {
+ existRevision.Add(k)
+ }
+ currentRevision := gxset.NewSet()
+ for _, s := range serviceInstances {
+ rv := getExportedServicesRevision(s)
+ if len(rv) != 0 {
+ currentRevision.Add(rv)
+ }
+ }
+ // staleRevisions = existedRevisions(copy) - currentRevisions
+ staleRevision := gxset.NewSet(existRevision.Values()...)
+ staleRevision.Remove(currentRevision.Values()...)
+ // remove exported URLs if staled
+ for _, s := range staleRevision.Values() {
+ delete(revisionExportedURLsMap, s.(string))
+ }
+}
+
+func (s *serviceDiscoveryRegistry) initRevisionExportedURLs(serviceInstances []registry.ServiceInstance) {
+ // initialize the revision exported URLs that the selected service instance exported
+ s.initSelectedRevisionExportedURLs(serviceInstances)
+ // initialize the revision exported URLs that other service instances exported
+ for _, serviceInstance := range serviceInstances {
+ s.initRevisionExportedURLsByInst(serviceInstance)
+ }
+}
+
+func (s *serviceDiscoveryRegistry) initSelectedRevisionExportedURLs(serviceInstances []registry.ServiceInstance) {
+ for range serviceInstances {
+ selectServiceInstance := s.selectServiceInstance(serviceInstances)
+ revisionExportedURLs := s.initRevisionExportedURLsByInst(selectServiceInstance)
+ if len(revisionExportedURLs) != 0 {
+ // If the result is valid,break
+ break
+ }
+ }
+}
+
+func (s *serviceDiscoveryRegistry) selectServiceInstance(serviceInstances []registry.ServiceInstance) registry.ServiceInstance {
+ size := len(serviceInstances)
+ if size == 0 {
+ return nil
+ }
+ if size == 1 {
+ return serviceInstances[0]
+ }
+ selectorName := s.url.GetParam(constant.SERVICE_INSTANCE_SELECTOR, "random")
+ selector, err := extension.GetServiceInstanceSelector(selectorName)
+ if err != nil {
+ logger.Errorf("get service instance selector cathe error:%s", err.Error())
+ return nil
+ }
+ return selector.Select(*s.url, serviceInstances)
+}
+
+func (s *serviceDiscoveryRegistry) initRevisionExportedURLsByInst(serviceInstance registry.ServiceInstance) []common.URL {
+ if serviceInstance == nil {
+ return []common.URL{}
+ }
+ serviceName := serviceInstance.GetServiceName()
+ revision := getExportedServicesRevision(serviceInstance)
+ revisionExportedURLsMap := s.serviceRevisionExportedURLsCache[serviceName]
+ if revisionExportedURLsMap == nil {
+ revisionExportedURLsMap = make(map[string][]common.URL, 4)
+ s.serviceRevisionExportedURLsCache[serviceName] = revisionExportedURLsMap
+ }
+ revisionExportedURLs := revisionExportedURLsMap[revision]
+ firstGet := false
+ if revisionExportedURLs == nil || len(revisionExportedURLs) == 0 {
+ if len(revisionExportedURLsMap) > 0 {
+ // The case is that current ServiceInstance with the different revision
+ logger.Warnf("The ServiceInstance[id: %s, host : %s , port : %s] has different revision : %s"+
+ ", please make sure the service [name : %s] is changing or not.", serviceInstance.GetId(),
+ serviceInstance.GetHost(), serviceInstance.GetPort(), revision, serviceInstance.GetServiceName())
+ } else {
+ firstGet = true
+ }
+ revisionExportedURLs = s.getExportedUrlsByInst(serviceInstance)
+ if revisionExportedURLs != nil {
+ revisionExportedURLsMap[revision] = revisionExportedURLs
+ logger.Debugf("Get the exported URLs[size : %s, first : %s] from the target service "+
+ "instance [id: %s , service : %s , host : %s , port : %s , revision : %s]",
+ len(revisionExportedURLs), firstGet, serviceInstance.GetId(), serviceInstance.GetServiceName(),
+ serviceInstance.GetHost(), serviceInstance.GetPort(), revision)
+ }
+ } else {
+ // Else, The cache is hit
+ logger.Debugf("Get the exported URLs[size : %s] from cache, the instance"+
+ "[id: %s , service : %s , host : %s , port : %s , revision : %s]", len(revisionExportedURLs), firstGet,
+ serviceInstance.GetId(), serviceInstance.GetServiceName(), serviceInstance.GetHost(),
+ serviceInstance.GetPort(), revision)
+ }
+ return revisionExportedURLs
+}
+
+func getExportedServicesRevision(serviceInstance registry.ServiceInstance) string {
+ metaData := serviceInstance.GetMetadata()
+ return metaData[constant.EXPORTED_SERVICES_REVISION_PROPERTY_NAME]
+}
+
+func getExportedStoreType(serviceInstance registry.ServiceInstance) string {
+ metaData := serviceInstance.GetMetadata()
+ result, ok := metaData[constant.METADATA_STORAGE_TYPE_PROPERTY_NAME]
+ if !ok {
+ return constant.DEFAULT_METADATA_STORAGE_TYPE
+ }
+ return result
+}
+
+func (s *serviceDiscoveryRegistry) cloneExportedURLs(url common.URL, serviceInsances []registry.ServiceInstance) []common.URL {
+ if serviceInsances == nil || len(serviceInsances) == 0 {
+ return []common.URL{}
+ }
+ var clonedExportedURLs []common.URL
+ removeParamSet := gxset.NewSet()
+ removeParamSet.Add(constant.PID_KEY)
+ removeParamSet.Add(constant.TIMESTAMP_KEY)
+ for _, serviceInstance := range serviceInsances {
+ templateExportURLs := s.getTemplateExportedURLs(url, serviceInstance)
+ host := serviceInstance.GetHost()
+ for _, u := range templateExportURLs {
+ port := strconv.Itoa(getProtocolPort(serviceInstance, u.Protocol))
+ if u.Location != host || u.Port != port {
+ u.Port = port // reset port
+ u.Location = host + ":" + port // reset host
+ }
+
+ cloneUrl := u.CloneExceptParams(removeParamSet)
+ clonedExportedURLs = append(clonedExportedURLs, *cloneUrl)
+ }
+ }
+ return clonedExportedURLs
+
+}
+
+type endpoint struct {
+ Port int `json:"port, omitempty"`
+ Protocol string `json:"protocol, omitempty"`
+}
+
+func getProtocolPort(serviceInstance registry.ServiceInstance, protocol string) int {
+ md := serviceInstance.GetMetadata()
+ rawEndpoints := md[constant.SERVICE_INSTANCE_ENDPOINTS]
+ if len(rawEndpoints) == 0 {
+ return -1
+ }
+ var endpoints []endpoint
+ err := json.Unmarshal([]byte(rawEndpoints), &endpoints)
+ if err != nil {
+ logger.Errorf("json umarshal rawEndpoints[%s] catch error:%s", rawEndpoints, err.Error())
+ return -1
+ }
+ for _, e := range endpoints {
+ if e.Protocol == protocol {
+ return e.Port
+ }
+ }
+ return -1
+}
+func (s *serviceDiscoveryRegistry) getTemplateExportedURLs(url common.URL, serviceInstance registry.ServiceInstance) []common.URL {
+ exportedURLs := s.getRevisionExportedURLs(serviceInstance)
+ if len(exportedURLs) == 0 {
+ return []common.URL{}
+ }
+ return filterSubscribedURLs(url, exportedURLs)
+}
+
+func (s *serviceDiscoveryRegistry) getRevisionExportedURLs(serviceInstance registry.ServiceInstance) []common.URL {
+ if serviceInstance == nil {
+ return []common.URL{}
+ }
+ serviceName := serviceInstance.GetServiceName()
+ revision := getExportedServicesRevision(serviceInstance)
+ s.lock.RLock()
+ revisionExportedURLsMap, exist := s.serviceRevisionExportedURLsCache[serviceName]
+ if !exist {
+ return []common.URL{}
+ }
+ exportedURLs, exist := revisionExportedURLsMap[revision]
+ if !exist {
+ return []common.URL{}
+ }
+ s.lock.RUnlock()
+ // Get a copy from source in order to prevent the caller trying to change the cached data
+ cloneExportedURLs := make([]common.URL, len(exportedURLs))
+ copy(cloneExportedURLs, exportedURLs)
+ return cloneExportedURLs
+}
+
+func filterSubscribedURLs(subscribedURL common.URL, exportedURLs []common.URL) []common.URL {
+ var filterExportedURLs []common.URL
+ for _, url := range exportedURLs {
+ if url.GetParam(constant.INTERFACE_KEY, url.Path) != subscribedURL.GetParam(constant.INTERFACE_KEY, url.Path) {
+ break
+ }
+ if url.GetParam(constant.VERSION_KEY, "") != subscribedURL.GetParam(constant.VERSION_KEY, "") {
+ break
+ }
+ if url.GetParam(constant.GROUP_KEY, "") != subscribedURL.GetParam(constant.GROUP_KEY, "") {
+ break
+ }
+ if len(subscribedURL.Protocol) != 0 {
+ if subscribedURL.Protocol != url.Protocol {
+ break
+ }
+ }
+ filterExportedURLs = append(filterExportedURLs, url)
+ }
+ return filterExportedURLs
+}
+
+type InstanceChangeNotify struct {
+ notify registry.NotifyListener
+ serviceDiscoveryRegistry *serviceDiscoveryRegistry
+}
+
+func (icn *InstanceChangeNotify) Notify(event observer.Event) {
+
+ if se, ok := event.(*registry.ServiceInstancesChangedEvent); ok {
+ sdr := icn.serviceDiscoveryRegistry
+ sdr.subscribe(sdr.url, icn.notify, se.ServiceName, se.Instances)
+ }
+}
+
+var (
+ exporting = &atomic.Bool{}
+)
+
+// tryInitMetadataService will try to initialize metadata service
+// TODO (move to somewhere)
Review comment:
move to where?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@dubbo.apache.org
For additional commands, e-mail: notifications-help@dubbo.apache.org
[GitHub] [dubbo-go] flycash commented on a change in pull request #604: [WIP] Application-level Registry Model
Posted by GitBox <gi...@apache.org>.
flycash commented on a change in pull request #604:
URL: https://github.com/apache/dubbo-go/pull/604#discussion_r440128144
##########
File path: config/config_loader.go
##########
@@ -233,39 +250,79 @@ func RPCService(service common.RPCService) {
// GetMetricConfig find the MetricConfig
// if it is nil, create a new one
+// we use double-check to reduce race condition
+// In general, it will be locked 0 or 1 time.
+// So you don't need to worry about the race condition
func GetMetricConfig() *MetricConfig {
- if metricConfig == nil {
- metricConfig = &MetricConfig{}
+ if GetBaseConfig().MetricConfig == nil {
+ configAccessMutex.Lock()
+ defer configAccessMutex.Unlock()
+ if GetBaseConfig().MetricConfig == nil {
+ GetBaseConfig().MetricConfig = &MetricConfig{}
+ }
}
- return metricConfig
+ return GetBaseConfig().MetricConfig
}
// GetApplicationConfig find the application config
// if not, we will create one
// Usually applicationConfig will be initialized when system start
+// we use double-check to reduce race condition
+// In general, it will be locked 0 or 1 time.
+// So you don't need to worry about the race condition
func GetApplicationConfig() *ApplicationConfig {
- if applicationConfig == nil {
- applicationConfig = &ApplicationConfig{}
+ if GetBaseConfig().ApplicationConfig == nil {
+ configAccessMutex.Lock()
+ defer configAccessMutex.Unlock()
+ if GetBaseConfig().ApplicationConfig == nil {
+ GetBaseConfig().ApplicationConfig = &ApplicationConfig{}
+ }
}
- return applicationConfig
+ return GetBaseConfig().ApplicationConfig
}
// GetProviderConfig find the provider config
// if not found, create new one
func GetProviderConfig() ProviderConfig {
if providerConfig == nil {
- logger.Warnf("providerConfig is nil!")
- return ProviderConfig{}
+ logger.Warnf("providerConfig is nil! we will try to create one")
+ if providerConfig == nil {
+ return ProviderConfig{}
+ }
}
return *providerConfig
}
// GetConsumerConfig find the consumer config
// if not found, create new one
+// we use double-check to reduce race condition
+// In general, it will be locked 0 or 1 time.
+// So you don't need to worry about the race condition
func GetConsumerConfig() ConsumerConfig {
if consumerConfig == nil {
- logger.Warnf("consumerConfig is nil!")
- return ConsumerConfig{}
+ if consumerConfig == nil {
+ return ConsumerConfig{}
+ }
}
return *consumerConfig
}
+
+func GetBaseConfig() *BaseConfig {
+
+ if baseConfig == nil {
Review comment:
why?
we init the baseConfig to avoid nil
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@dubbo.apache.org
For additional commands, e-mail: notifications-help@dubbo.apache.org
[GitHub] [dubbo-go] AlexStocks commented on a change in pull request #604: Ftr: Application-level Registry Model
Posted by GitBox <gi...@apache.org>.
AlexStocks commented on a change in pull request #604:
URL: https://github.com/apache/dubbo-go/pull/604#discussion_r443191365
##########
File path: config/service_config.go
##########
@@ -73,14 +73,18 @@ type ServiceConfig struct {
ParamSign string `yaml:"param.sign" json:"param.sign,omitempty" property:"param.sign"`
Tag string `yaml:"tag" json:"tag,omitempty" property:"tag"`
+ Protocols map[string]*ProtocolConfig
unexported *atomic.Bool
exported *atomic.Bool
rpcService common.RPCService
- cacheProtocol protocol.Protocol
cacheMutex sync.Mutex
+ cacheProtocol protocol.Protocol
+
+ exportersLock sync.Mutex
+ exporters []protocol.Exporter
}
-// nolint
+// Prefix return dubbo.service.${interface}.
Review comment:
Prefix returns dubbo.service.${interface}.
##########
File path: common/extension/event_dispatcher.go
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package extension
+
+import (
+ "sync"
+)
+
+import (
+ "github.com/apache/dubbo-go/common/logger"
+ "github.com/apache/dubbo-go/common/observer"
+)
+
+var (
+ globalEventDispatcher observer.EventDispatcher
+ initEventListeners []func() observer.EventListener
+ initEventOnce sync.Once
+)
+
+var (
+ dispatchers = make(map[string]func() observer.EventDispatcher, 8)
+)
+
+// SetEventDispatcher, actually, it doesn't really init the global dispatcher
+func SetEventDispatcher(name string, v func() observer.EventDispatcher) {
+ dispatchers[name] = v
+}
+
+// SetAndInitGlobalDispatcher will actually init the global dispatcher
+// if there is already a global dispatcher,
+// it will be override
+// if the dispatcher with the name not found, it will panic
+func SetAndInitGlobalDispatcher(name string) {
+ if len(name) == 0 {
+ name = "direct"
+ }
+ if globalEventDispatcher != nil {
+ logger.Warnf("EventDispatcher has been initialized. It will be replaced")
+ }
+
+ if dp, ok := dispatchers[name]; !ok || dp == nil {
+ panic("EventDispatcher for " + name + " is not found, make sure you have import the package, " +
+ "like github.com/apache/dubbo-go/common/observer/dispatcher ")
Review comment:
like, \'import _ github.com/apache/dubbo-go/common/observer/dispatcher\'
##########
File path: config/service_config.go
##########
@@ -125,7 +141,7 @@ func getRandomPort(protocolConfigs []*ProtocolConfig) *list.List {
return ports
}
-// Export ...
+// Export export the service
Review comment:
Export exports the service
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@dubbo.apache.org
For additional commands, e-mail: notifications-help@dubbo.apache.org
[GitHub] [dubbo-go] hxmhlt commented on a change in pull request #604: Ftr: Application-level Registry Model
Posted by GitBox <gi...@apache.org>.
hxmhlt commented on a change in pull request #604:
URL: https://github.com/apache/dubbo-go/pull/604#discussion_r453197539
##########
File path: common/constant/key.go
##########
@@ -155,6 +163,14 @@ const (
NACOS_USERNAME = "username"
)
Review comment:
Remove the constant into nacos's directory.
##########
File path: common/constant/key.go
##########
@@ -155,6 +163,14 @@ const (
NACOS_USERNAME = "username"
)
+const (
+ ZOOKEEPER_KEY = "zookeeper"
+)
+
+const (
+ ETCDV3_KEY = "etcdv3"
+)
Review comment:
Remove the constant into etcd 's directory.
##########
File path: common/constant/key.go
##########
@@ -155,6 +163,14 @@ const (
NACOS_USERNAME = "username"
)
+const (
+ ZOOKEEPER_KEY = "zookeeper"
+)
Review comment:
Remove the constant into zookeeper 's directory.
##########
File path: config/base_config.go
##########
@@ -64,7 +83,7 @@ func (c *BaseConfig) startConfigCenter() error {
if c.prepareEnvironment() != nil {
return perrors.WithMessagef(err, "start config center error!")
}
- //c.fresh()
+ // c.fresh()
return err
Review comment:
OK
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@dubbo.apache.org
For additional commands, e-mail: notifications-help@dubbo.apache.org
[GitHub] [dubbo-go] AlexStocks commented on a change in pull request #604: Ftr: Application-level Registry Model
Posted by GitBox <gi...@apache.org>.
AlexStocks commented on a change in pull request #604:
URL: https://github.com/apache/dubbo-go/pull/604#discussion_r452700965
##########
File path: remoting/zookeeper/curator_discovery/service_discovery.go
##########
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package curator_discovery
+
+import (
+ "encoding/json"
+ "path"
+ "strings"
+ "sync"
+
+ "github.com/dubbogo/go-zookeeper/zk"
Review comment:
pls move it to the third import block.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@dubbo.apache.org
For additional commands, e-mail: notifications-help@dubbo.apache.org
[GitHub] [dubbo-go] flycash commented on a change in pull request #604: [WIP] Application-level Registry Model
Posted by GitBox <gi...@apache.org>.
flycash commented on a change in pull request #604:
URL: https://github.com/apache/dubbo-go/pull/604#discussion_r440679930
##########
File path: config/config_loader.go
##########
@@ -233,39 +250,79 @@ func RPCService(service common.RPCService) {
// GetMetricConfig find the MetricConfig
// if it is nil, create a new one
+// we use double-check to reduce race condition
+// In general, it will be locked 0 or 1 time.
+// So you don't need to worry about the race condition
func GetMetricConfig() *MetricConfig {
- if metricConfig == nil {
- metricConfig = &MetricConfig{}
+ if GetBaseConfig().MetricConfig == nil {
+ configAccessMutex.Lock()
+ defer configAccessMutex.Unlock()
+ if GetBaseConfig().MetricConfig == nil {
+ GetBaseConfig().MetricConfig = &MetricConfig{}
+ }
}
- return metricConfig
+ return GetBaseConfig().MetricConfig
}
// GetApplicationConfig find the application config
// if not, we will create one
// Usually applicationConfig will be initialized when system start
+// we use double-check to reduce race condition
+// In general, it will be locked 0 or 1 time.
+// So you don't need to worry about the race condition
func GetApplicationConfig() *ApplicationConfig {
- if applicationConfig == nil {
- applicationConfig = &ApplicationConfig{}
+ if GetBaseConfig().ApplicationConfig == nil {
+ configAccessMutex.Lock()
+ defer configAccessMutex.Unlock()
+ if GetBaseConfig().ApplicationConfig == nil {
+ GetBaseConfig().ApplicationConfig = &ApplicationConfig{}
+ }
}
- return applicationConfig
+ return GetBaseConfig().ApplicationConfig
}
// GetProviderConfig find the provider config
// if not found, create new one
func GetProviderConfig() ProviderConfig {
if providerConfig == nil {
- logger.Warnf("providerConfig is nil!")
- return ProviderConfig{}
+ logger.Warnf("providerConfig is nil! we will try to create one")
+ if providerConfig == nil {
+ return ProviderConfig{}
+ }
}
return *providerConfig
}
// GetConsumerConfig find the consumer config
// if not found, create new one
+// we use double-check to reduce race condition
+// In general, it will be locked 0 or 1 time.
+// So you don't need to worry about the race condition
func GetConsumerConfig() ConsumerConfig {
if consumerConfig == nil {
- logger.Warnf("consumerConfig is nil!")
- return ConsumerConfig{}
+ if consumerConfig == nil {
+ return ConsumerConfig{}
+ }
}
return *consumerConfig
}
+
+func GetBaseConfig() *BaseConfig {
+
+ if baseConfig == nil {
Review comment:
usually it's initialized by reading config file. Only in UT, it could be nil.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@dubbo.apache.org
For additional commands, e-mail: notifications-help@dubbo.apache.org
[GitHub] [dubbo-go] zouyx commented on a change in pull request #604: Ftr: Application-level Registry Model
Posted by GitBox <gi...@apache.org>.
zouyx commented on a change in pull request #604:
URL: https://github.com/apache/dubbo-go/pull/604#discussion_r445286737
##########
File path: config/router_config_test.go
##########
@@ -53,15 +52,3 @@ func TestString(t *testing.T) {
assert.Equal(t, n2[0], "a1")
assert.Equal(t, n2[1], "")
}
-
-func TestRouterInit(t *testing.T) {
Review comment:
why delete this case?
##########
File path: registry/event/event_publishing_service_deiscovery_test.go
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package event
+
+import (
+ "reflect"
+ "testing"
+
+ "github.com/apache/dubbo-go/metadata/mapping"
Review comment:
split
##########
File path: registry/event/event_publishing_service_deiscovery_test.go
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package event
+
+import (
+ "reflect"
+ "testing"
+
+ "github.com/apache/dubbo-go/metadata/mapping"
+)
+
+import (
+ gxset "github.com/dubbogo/gost/container/set"
+ gxpage "github.com/dubbogo/gost/page"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/suite"
Review comment:
split it
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@dubbo.apache.org
For additional commands, e-mail: notifications-help@dubbo.apache.org
[GitHub] [dubbo-go] flycash merged pull request #604: Ftr: Application-level Registry Model
Posted by GitBox <gi...@apache.org>.
flycash merged pull request #604:
URL: https://github.com/apache/dubbo-go/pull/604
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@dubbo.apache.org
For additional commands, e-mail: notifications-help@dubbo.apache.org
[GitHub] [dubbo-go] flycash commented on a change in pull request #604: [WIP] Application-level Registry Model
Posted by GitBox <gi...@apache.org>.
flycash commented on a change in pull request #604:
URL: https://github.com/apache/dubbo-go/pull/604#discussion_r440124646
##########
File path: config/base_config.go
##########
@@ -64,7 +83,7 @@ func (c *BaseConfig) startConfigCenter() error {
if c.prepareEnvironment() != nil {
return perrors.WithMessagef(err, "start config center error!")
}
- //c.fresh()
+ // c.fresh()
return err
Review comment:
It was deleted by you and I don't know why too. I just format the comment:
![image](https://user-images.githubusercontent.com/9923838/84654927-9f16df00-af42-11ea-9a66-8a2e620363b8.png)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@dubbo.apache.org
For additional commands, e-mail: notifications-help@dubbo.apache.org
[GitHub] [dubbo-go] codecov-commenter edited a comment on pull request #604: Ftr: Application-level Registry Model
Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #604:
URL: https://github.com/apache/dubbo-go/pull/604#issuecomment-653771940
# [Codecov](https://codecov.io/gh/apache/dubbo-go/pull/604?src=pr&el=h1) Report
> Merging [#604](https://codecov.io/gh/apache/dubbo-go/pull/604?src=pr&el=desc) into [develop](https://codecov.io/gh/apache/dubbo-go/commit/57019b724c2e593a722f16a53b899954e5295ed7&el=desc) will **decrease** coverage by `3.32%`.
> The diff coverage is `53.08%`.
[![Impacted file tree graph](https://codecov.io/gh/apache/dubbo-go/pull/604/graphs/tree.svg?width=650&height=150&src=pr&token=dcPE6RyFAL)](https://codecov.io/gh/apache/dubbo-go/pull/604?src=pr&el=tree)
```diff
@@ Coverage Diff @@
## develop #604 +/- ##
===========================================
- Coverage 67.05% 63.72% -3.33%
===========================================
Files 190 236 +46
Lines 9914 12313 +2399
===========================================
+ Hits 6648 7847 +1199
- Misses 2608 3701 +1093
- Partials 658 765 +107
```
| [Impacted Files](https://codecov.io/gh/apache/dubbo-go/pull/604?src=pr&el=tree) | Coverage Δ | |
|---|---|---|
| [common/config/environment.go](https://codecov.io/gh/apache/dubbo-go/pull/604/diff?src=pr&el=tree#diff-Y29tbW9uL2NvbmZpZy9lbnZpcm9ubWVudC5nbw==) | `52.38% <0.00%> (ø)` | |
| [common/extension/metadata\_report\_factory.go](https://codecov.io/gh/apache/dubbo-go/pull/604/diff?src=pr&el=tree#diff-Y29tbW9uL2V4dGVuc2lvbi9tZXRhZGF0YV9yZXBvcnRfZmFjdG9yeS5nbw==) | `0.00% <0.00%> (ø)` | |
| [common/extension/metadata\_service.go](https://codecov.io/gh/apache/dubbo-go/pull/604/diff?src=pr&el=tree#diff-Y29tbW9uL2V4dGVuc2lvbi9tZXRhZGF0YV9zZXJ2aWNlLmdv) | `0.00% <0.00%> (ø)` | |
| [common/extension/metadata\_service\_proxy\_factory.go](https://codecov.io/gh/apache/dubbo-go/pull/604/diff?src=pr&el=tree#diff-Y29tbW9uL2V4dGVuc2lvbi9tZXRhZGF0YV9zZXJ2aWNlX3Byb3h5X2ZhY3RvcnkuZ28=) | `0.00% <0.00%> (ø)` | |
| [common/extension/registry.go](https://codecov.io/gh/apache/dubbo-go/pull/604/diff?src=pr&el=tree#diff-Y29tbW9uL2V4dGVuc2lvbi9yZWdpc3RyeS5nbw==) | `0.00% <0.00%> (ø)` | |
| [common/extension/rest\_client.go](https://codecov.io/gh/apache/dubbo-go/pull/604/diff?src=pr&el=tree#diff-Y29tbW9uL2V4dGVuc2lvbi9yZXN0X2NsaWVudC5nbw==) | `0.00% <0.00%> (ø)` | |
| [common/extension/router\_factory.go](https://codecov.io/gh/apache/dubbo-go/pull/604/diff?src=pr&el=tree#diff-Y29tbW9uL2V4dGVuc2lvbi9yb3V0ZXJfZmFjdG9yeS5nbw==) | `0.00% <0.00%> (ø)` | |
| [common/extension/service\_discovery.go](https://codecov.io/gh/apache/dubbo-go/pull/604/diff?src=pr&el=tree#diff-Y29tbW9uL2V4dGVuc2lvbi9zZXJ2aWNlX2Rpc2NvdmVyeS5nbw==) | `0.00% <0.00%> (ø)` | |
| [common/extension/service\_instance\_customizer.go](https://codecov.io/gh/apache/dubbo-go/pull/604/diff?src=pr&el=tree#diff-Y29tbW9uL2V4dGVuc2lvbi9zZXJ2aWNlX2luc3RhbmNlX2N1c3RvbWl6ZXIuZ28=) | `0.00% <0.00%> (ø)` | |
| [...mon/extension/service\_instance\_selector\_factory.go](https://codecov.io/gh/apache/dubbo-go/pull/604/diff?src=pr&el=tree#diff-Y29tbW9uL2V4dGVuc2lvbi9zZXJ2aWNlX2luc3RhbmNlX3NlbGVjdG9yX2ZhY3RvcnkuZ28=) | `0.00% <0.00%> (ø)` | |
| ... and [155 more](https://codecov.io/gh/apache/dubbo-go/pull/604/diff?src=pr&el=tree-more) | |
------
[Continue to review full report at Codecov](https://codecov.io/gh/apache/dubbo-go/pull/604?src=pr&el=continue).
> **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
> `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
> Powered by [Codecov](https://codecov.io/gh/apache/dubbo-go/pull/604?src=pr&el=footer). Last update [81026a3...2f73ae5](https://codecov.io/gh/apache/dubbo-go/pull/604?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@dubbo.apache.org
For additional commands, e-mail: notifications-help@dubbo.apache.org
[GitHub] [dubbo-go] fangyincheng commented on a change in pull request #604: Ftr: Application-level Registry Model
Posted by GitBox <gi...@apache.org>.
fangyincheng commented on a change in pull request #604:
URL: https://github.com/apache/dubbo-go/pull/604#discussion_r449857260
##########
File path: registry/zookeeper/service_discovery_test.go
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package zookeeper
+
+import (
+ "github.com/apache/dubbo-go/common/extension"
Review comment:
format
##########
File path: remoting/zookeeper/facade.go
##########
@@ -18,6 +18,7 @@
package zookeeper
import (
+ "github.com/apache/dubbo-go/common"
Review comment:
format
##########
File path: remoting/consul/test_agent.go
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package consul
+
+import (
+ "io/ioutil"
+ "os"
+ "strconv"
+ "testing"
+)
+
+import (
+ "github.com/hashicorp/consul/agent"
+)
+
+// Consul agent, used for test, simulates
+// an embedded consul server.
+type ConsulAgent struct {
+ dataDir string
+ testAgent *agent.TestAgent
+}
+
+func NewConsulAgent(t *testing.T, port int) *ConsulAgent {
+ dataDir, _ := ioutil.TempDir("./", "agent")
+ hcl := `
+ ports {
+ http = ` + strconv.Itoa(port) + `
+ }
+ data_dir = "` + dataDir + `"
+ `
+ testAgent := &agent.TestAgent{Name: t.Name(), DataDir: dataDir, HCL: hcl}
+ testAgent.Start(t)
+
+ consulAgent := &ConsulAgent{
+ dataDir: dataDir,
+ testAgent: testAgent,
+ }
+ return consulAgent
+}
+
+func (consulAgent *ConsulAgent) Close() error {
+ var err error
+
+ err = consulAgent.testAgent.Shutdown()
+ if err != nil {
+ return err
+ }
+
+ err = os.RemoveAll(consulAgent.dataDir)
+ if err != nil {
Review comment:
you can return err directly.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@dubbo.apache.org
For additional commands, e-mail: notifications-help@dubbo.apache.org