You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ze...@apache.org on 2022/11/29 10:37:18 UTC

[streampipes] branch dev updated: [hotfix] Increase retry time to avoid document conflict

This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git


The following commit(s) were added to refs/heads/dev by this push:
     new f9f67c2ec [hotfix] Increase retry time to avoid document conflict
f9f67c2ec is described below

commit f9f67c2ec23e3aaf10d8900c2527b4ea9b3746f0
Author: Philipp Zehnder <te...@users.noreply.github.com>
AuthorDate: Tue Nov 29 11:37:07 2022 +0100

    [hotfix] Increase retry time to avoid document conflict
---
 .../management/WorkerAdministrationManagement.java | 87 +++++++++++-----------
 1 file changed, 43 insertions(+), 44 deletions(-)

diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerAdministrationManagement.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerAdministrationManagement.java
index c66aef2df..9a5455391 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerAdministrationManagement.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerAdministrationManagement.java
@@ -6,14 +6,13 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
- *
  */
 
 package org.apache.streampipes.connect.container.master.management;
@@ -23,6 +22,7 @@ import org.apache.streampipes.connect.container.master.health.AdapterOperationLo
 import org.apache.streampipes.model.connect.adapter.AdapterDescription;
 import org.apache.streampipes.storage.api.IAdapterStorage;
 import org.apache.streampipes.storage.couchdb.CouchDbStorageManager;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -31,56 +31,55 @@ import java.util.concurrent.TimeUnit;
 
 public class WorkerAdministrationManagement {
 
-    private static final Logger LOG = LoggerFactory.getLogger(AdapterMasterManagement.class);
-    private static final int MAX_RETRIES = 3;
+  private static final Logger LOG = LoggerFactory.getLogger(AdapterMasterManagement.class);
+  private static final int MAX_RETRIES = 7;
 
-    private final IAdapterStorage adapterDescriptionStorage;
+  private final IAdapterStorage adapterDescriptionStorage;
 
-    private final AdapterHealthCheck adapterHealthCheck;
+  private final AdapterHealthCheck adapterHealthCheck;
 
-    public WorkerAdministrationManagement() {
-        this.adapterHealthCheck = new AdapterHealthCheck();
-        this.adapterDescriptionStorage = CouchDbStorageManager.INSTANCE.getAdapterDescriptionStorage();
-    }
+  public WorkerAdministrationManagement() {
+    this.adapterHealthCheck = new AdapterHealthCheck();
+    this.adapterDescriptionStorage = CouchDbStorageManager.INSTANCE.getAdapterDescriptionStorage();
+  }
 
-    public void register(List<AdapterDescription> availableAdapterDescription) {
-        List<AdapterDescription> alreadyRegisteredAdapters = this.adapterDescriptionStorage.getAllAdapters();
+  public void register(List<AdapterDescription> availableAdapterDescription) {
+    List<AdapterDescription> alreadyRegisteredAdapters = this.adapterDescriptionStorage.getAllAdapters();
 
-        availableAdapterDescription.forEach(adapterDescription -> {
+    availableAdapterDescription.forEach(adapterDescription -> {
 
-            // only install once adapter description per service group
-            boolean alreadyInstalled = alreadyRegisteredAdapters
-                    .stream()
-                    .anyMatch(a -> a.getAppId().equals(adapterDescription.getAppId()));
-            if (!alreadyInstalled) {
-                this.adapterDescriptionStorage.storeAdapter(adapterDescription);
-            }
-        });
+      // only install once adapter description per service group
+      boolean alreadyInstalled =
+          alreadyRegisteredAdapters.stream().anyMatch(a -> a.getAppId().equals(adapterDescription.getAppId()));
+      if (!alreadyInstalled) {
+        this.adapterDescriptionStorage.storeAdapter(adapterDescription);
+      }
+    });
 
-        int retryCount = 0;
-        checkAndRestore(retryCount);
-    }
+    int retryCount = 0;
+    checkAndRestore(retryCount);
+  }
 
-    private void checkAndRestore(int retryCount) {
-        if (AdapterOperationLock.INSTANCE.isLocked()) {
-            LOG.info("Adapter operation already in progress, {}/{}", (retryCount + 1), MAX_RETRIES);
-            if (retryCount < MAX_RETRIES) {
-                try {
-                    TimeUnit.MILLISECONDS.sleep(1000);
-                    retryCount++;
-                    checkAndRestore(retryCount);
-                } catch (InterruptedException e) {
-                    e.printStackTrace();
-                }
-            } else {
-                LOG.info("Max retries for running adapter operations reached, will do unlock which might cause conflicts...");
-                AdapterOperationLock.INSTANCE.unlock();
-                this.adapterHealthCheck.checkAndRestoreAdapters();
-            }
-        } else {
-            AdapterOperationLock.INSTANCE.lock();
-            this.adapterHealthCheck.checkAndRestoreAdapters();
-            AdapterOperationLock.INSTANCE.unlock();
+  private void checkAndRestore(int retryCount) {
+    if (AdapterOperationLock.INSTANCE.isLocked()) {
+      LOG.info("Adapter operation already in progress, {}/{}", (retryCount + 1), MAX_RETRIES);
+      if (retryCount <= MAX_RETRIES) {
+        try {
+          TimeUnit.MILLISECONDS.sleep(3000);
+          retryCount++;
+          checkAndRestore(retryCount);
+        } catch (InterruptedException e) {
+          e.printStackTrace();
         }
+      } else {
+        LOG.info("Max retries for running adapter operations reached, will do unlock which might cause conflicts...");
+        AdapterOperationLock.INSTANCE.unlock();
+        this.adapterHealthCheck.checkAndRestoreAdapters();
+      }
+    } else {
+      AdapterOperationLock.INSTANCE.lock();
+      this.adapterHealthCheck.checkAndRestoreAdapters();
+      AdapterOperationLock.INSTANCE.unlock();
     }
+  }
 }