You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ze...@apache.org on 2022/11/29 10:37:18 UTC
[streampipes] branch dev updated: [hotfix] Increase retry time to avoid document conflict
This is an automated email from the ASF dual-hosted git repository.
zehnder pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to refs/heads/dev by this push:
new f9f67c2ec [hotfix] Increase retry time to avoid document conflict
f9f67c2ec is described below
commit f9f67c2ec23e3aaf10d8900c2527b4ea9b3746f0
Author: Philipp Zehnder <te...@users.noreply.github.com>
AuthorDate: Tue Nov 29 11:37:07 2022 +0100
[hotfix] Increase retry time to avoid document conflict
---
.../management/WorkerAdministrationManagement.java | 87 +++++++++++-----------
1 file changed, 43 insertions(+), 44 deletions(-)
diff --git a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerAdministrationManagement.java b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerAdministrationManagement.java
index c66aef2df..9a5455391 100644
--- a/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerAdministrationManagement.java
+++ b/streampipes-connect-container-master/src/main/java/org/apache/streampipes/connect/container/master/management/WorkerAdministrationManagement.java
@@ -6,14 +6,13 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- *
*/
package org.apache.streampipes.connect.container.master.management;
@@ -23,6 +22,7 @@ import org.apache.streampipes.connect.container.master.health.AdapterOperationLo
import org.apache.streampipes.model.connect.adapter.AdapterDescription;
import org.apache.streampipes.storage.api.IAdapterStorage;
import org.apache.streampipes.storage.couchdb.CouchDbStorageManager;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -31,56 +31,55 @@ import java.util.concurrent.TimeUnit;
public class WorkerAdministrationManagement {
- private static final Logger LOG = LoggerFactory.getLogger(AdapterMasterManagement.class);
- private static final int MAX_RETRIES = 3;
+ private static final Logger LOG = LoggerFactory.getLogger(AdapterMasterManagement.class);
+ private static final int MAX_RETRIES = 7;
- private final IAdapterStorage adapterDescriptionStorage;
+ private final IAdapterStorage adapterDescriptionStorage;
- private final AdapterHealthCheck adapterHealthCheck;
+ private final AdapterHealthCheck adapterHealthCheck;
- public WorkerAdministrationManagement() {
- this.adapterHealthCheck = new AdapterHealthCheck();
- this.adapterDescriptionStorage = CouchDbStorageManager.INSTANCE.getAdapterDescriptionStorage();
- }
+ public WorkerAdministrationManagement() {
+ this.adapterHealthCheck = new AdapterHealthCheck();
+ this.adapterDescriptionStorage = CouchDbStorageManager.INSTANCE.getAdapterDescriptionStorage();
+ }
- public void register(List<AdapterDescription> availableAdapterDescription) {
- List<AdapterDescription> alreadyRegisteredAdapters = this.adapterDescriptionStorage.getAllAdapters();
+ public void register(List<AdapterDescription> availableAdapterDescription) {
+ List<AdapterDescription> alreadyRegisteredAdapters = this.adapterDescriptionStorage.getAllAdapters();
- availableAdapterDescription.forEach(adapterDescription -> {
+ availableAdapterDescription.forEach(adapterDescription -> {
- // only install once adapter description per service group
- boolean alreadyInstalled = alreadyRegisteredAdapters
- .stream()
- .anyMatch(a -> a.getAppId().equals(adapterDescription.getAppId()));
- if (!alreadyInstalled) {
- this.adapterDescriptionStorage.storeAdapter(adapterDescription);
- }
- });
+ // only install once adapter description per service group
+ boolean alreadyInstalled =
+ alreadyRegisteredAdapters.stream().anyMatch(a -> a.getAppId().equals(adapterDescription.getAppId()));
+ if (!alreadyInstalled) {
+ this.adapterDescriptionStorage.storeAdapter(adapterDescription);
+ }
+ });
- int retryCount = 0;
- checkAndRestore(retryCount);
- }
+ int retryCount = 0;
+ checkAndRestore(retryCount);
+ }
- private void checkAndRestore(int retryCount) {
- if (AdapterOperationLock.INSTANCE.isLocked()) {
- LOG.info("Adapter operation already in progress, {}/{}", (retryCount + 1), MAX_RETRIES);
- if (retryCount < MAX_RETRIES) {
- try {
- TimeUnit.MILLISECONDS.sleep(1000);
- retryCount++;
- checkAndRestore(retryCount);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- } else {
- LOG.info("Max retries for running adapter operations reached, will do unlock which might cause conflicts...");
- AdapterOperationLock.INSTANCE.unlock();
- this.adapterHealthCheck.checkAndRestoreAdapters();
- }
- } else {
- AdapterOperationLock.INSTANCE.lock();
- this.adapterHealthCheck.checkAndRestoreAdapters();
- AdapterOperationLock.INSTANCE.unlock();
+ private void checkAndRestore(int retryCount) {
+ if (AdapterOperationLock.INSTANCE.isLocked()) {
+ LOG.info("Adapter operation already in progress, {}/{}", (retryCount + 1), MAX_RETRIES);
+ if (retryCount <= MAX_RETRIES) {
+ try {
+ TimeUnit.MILLISECONDS.sleep(3000);
+ retryCount++;
+ checkAndRestore(retryCount);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
}
+ } else {
+ LOG.info("Max retries for running adapter operations reached, will do unlock which might cause conflicts...");
+ AdapterOperationLock.INSTANCE.unlock();
+ this.adapterHealthCheck.checkAndRestoreAdapters();
+ }
+ } else {
+ AdapterOperationLock.INSTANCE.lock();
+ this.adapterHealthCheck.checkAndRestoreAdapters();
+ AdapterOperationLock.INSTANCE.unlock();
}
+ }
}