You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ab...@apache.org on 2017/07/29 08:03:30 UTC
[1/2] incubator-gobblin git commit: Allow disabling global
throttling. Fix a race condition in BatchedPermitsRequester.
Repository: incubator-gobblin
Updated Branches:
refs/heads/master 725a0829d -> a080ad843
Allow disabling global throttling. Fix a race condition in BatchedPermitsRequester.
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/82b5b2d1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/82b5b2d1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/82b5b2d1
Branch: refs/heads/master
Commit: 82b5b2d12d86ca8819a678bf3403495bfd2683e7
Parents: 5459609
Author: ibuenros <is...@gmail.com>
Authored: Wed May 17 10:18:41 2017 -0700
Committer: ibuenros <is...@gmail.com>
Committed: Wed May 17 10:18:41 2017 -0700
----------------------------------------------------------------------
.../util/limiter/BatchedPermitsRequester.java | 11 +-
.../RedirectAwareRestClientRequestSender.java | 5 +-
.../util/limiter/RestliLimiterFactoryTest.java | 121 +++++++++++++++++++
.../limiter/broker/SharedLimiterFactory.java | 14 ++-
4 files changed, 146 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/82b5b2d1/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/main/java/gobblin/util/limiter/BatchedPermitsRequester.java
----------------------------------------------------------------------
diff --git a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/main/java/gobblin/util/limiter/BatchedPermitsRequester.java b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/main/java/gobblin/util/limiter/BatchedPermitsRequester.java
index 2e6e97e..9ea8c50 100644
--- a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/main/java/gobblin/util/limiter/BatchedPermitsRequester.java
+++ b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/main/java/gobblin/util/limiter/BatchedPermitsRequester.java
@@ -105,6 +105,7 @@ class BatchedPermitsRequester {
private final RetryStatus retryStatus;
private final SynchronizedAverager permitsOutstanding;
private final long targetMillisBetweenRequests;
+ private final AtomicLong callbackCounter;
@Builder
private BatchedPermitsRequester(String resourceId, String requestorIdentifier,
@@ -131,6 +132,7 @@ class BatchedPermitsRequester {
this.restRequestTimer = metricContext == null ? null : metricContext.timer(REST_REQUEST_TIMER);
this.restRequestHistogram = metricContext == null ? null : metricContext.histogram(REST_REQUEST_PERMITS_HISTOGRAM);
+ this.callbackCounter = new AtomicLong();
}
/**
@@ -150,8 +152,13 @@ class BatchedPermitsRequester {
return true;
}
if (this.retryStatus.canRetryWithinMillis(10000)) {
+ long callbackCounterSnap = this.callbackCounter.get();
maybeSendNewPermitRequest();
- this.newPermitsAvailable.await();
+ if (this.callbackCounter.get() == callbackCounterSnap) {
+ // If a callback has happened since we tried to send the new permit request, don't await
+ // Since some request senders may be synchronous, we would have missed the notification
+ this.newPermitsAvailable.await();
+ }
} else {
break;
}
@@ -279,6 +286,7 @@ class BatchedPermitsRequester {
@Override
public void onSuccess(Response<PermitAllocation> result) {
BatchedPermitsRequester.this.retries = 0;
+ BatchedPermitsRequester.this.callbackCounter.incrementAndGet();
BatchedPermitsRequester.this.lock.lock();
try {
PermitAllocation allocation = result.getEntity();
@@ -309,6 +317,7 @@ class BatchedPermitsRequester {
private void nonRetriableFail(Throwable exc, String msg) {
BatchedPermitsRequester.this.retryStatus.blockRetries(RETRY_DELAY_ON_NON_RETRIABLE_EXCEPTION, exc);
+ BatchedPermitsRequester.this.callbackCounter.incrementAndGet();
BatchedPermitsRequester.this.requestSemaphore.release();
log.error(msg, exc);
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/82b5b2d1/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/main/java/gobblin/util/limiter/RedirectAwareRestClientRequestSender.java
----------------------------------------------------------------------
diff --git a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/main/java/gobblin/util/limiter/RedirectAwareRestClientRequestSender.java b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/main/java/gobblin/util/limiter/RedirectAwareRestClientRequestSender.java
index 752c933..f7bd631 100644
--- a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/main/java/gobblin/util/limiter/RedirectAwareRestClientRequestSender.java
+++ b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/main/java/gobblin/util/limiter/RedirectAwareRestClientRequestSender.java
@@ -61,14 +61,14 @@ public class RedirectAwareRestClientRequestSender extends RestClientRequestSende
* A {@link SharedResourceFactory} that creates {@link RedirectAwareRestClientRequestSender}s.
* @param <S>
*/
- public static class Factory<S extends ScopeType<S>> implements SharedResourceFactory<RedirectAwareRestClientRequestSender, SharedRestClientKey, S> {
+ public static class Factory<S extends ScopeType<S>> implements SharedResourceFactory<RequestSender, SharedRestClientKey, S> {
@Override
public String getName() {
return SharedRestClientFactory.FACTORY_NAME;
}
@Override
- public SharedResourceFactoryResponse<RedirectAwareRestClientRequestSender> createResource(
+ public SharedResourceFactoryResponse<RequestSender> createResource(
SharedResourcesBroker<S> broker, ScopedConfigView<S, SharedRestClientKey> config)
throws NotConfiguredException {
try {
@@ -177,7 +177,6 @@ public class RedirectAwareRestClientRequestSender extends RestClientRequestSende
if (this.retries > RedirectAwareRestClientRequestSender.this.connectionPrefixes.size()) {
this.underlying.onError(new NonRetriableException("Failed to connect to all available connection prefixes."));
}
- log.info("Retries " + this.retries + " this " + hashCode());
updateRestClient(getNextConnectionPrefix(), "Failed to communicate with " + getCurrentServerPrefix());
this.exponentialBackoff.awaitNextRetry();
sendRequest(this.originalRequest, this);
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/82b5b2d1/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/test/java/gobblin/util/limiter/RestliLimiterFactoryTest.java
----------------------------------------------------------------------
diff --git a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/test/java/gobblin/util/limiter/RestliLimiterFactoryTest.java b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/test/java/gobblin/util/limiter/RestliLimiterFactoryTest.java
new file mode 100644
index 0000000..951612f
--- /dev/null
+++ b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/test/java/gobblin/util/limiter/RestliLimiterFactoryTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package gobblin.util.limiter;
+
+import java.util.List;
+import java.util.Map;
+
+import org.junit.Assert;
+import org.mockito.Mockito;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.linkedin.common.callback.Callback;
+import com.linkedin.restli.client.Response;
+import com.typesafe.config.ConfigFactory;
+
+import gobblin.broker.BrokerConfigurationKeyGenerator;
+import gobblin.broker.SharedResourcesBrokerFactory;
+import gobblin.broker.SimpleScopeType;
+import gobblin.broker.iface.SharedResourcesBroker;
+import gobblin.restli.SharedRestClientKey;
+import gobblin.restli.throttling.PermitAllocation;
+import gobblin.restli.throttling.PermitRequest;
+import gobblin.util.limiter.broker.SharedLimiterFactory;
+import gobblin.util.limiter.broker.SharedLimiterKey;
+
+
+public class RestliLimiterFactoryTest {
+
+ @Test
+ public void testFactory() throws Exception {
+ SharedResourcesBroker<SimpleScopeType> broker = SharedResourcesBrokerFactory.createDefaultTopLevelBroker(
+ ConfigFactory.empty(), SimpleScopeType.GLOBAL.defaultScopeInstance());
+
+ MyRequestSender requestSender = new MyRequestSender();
+
+ broker.bindSharedResourceAtScope(new RedirectAwareRestClientRequestSender.Factory<>(),
+ new SharedRestClientKey(RestliLimiterFactory.RESTLI_SERVICE_NAME), SimpleScopeType.GLOBAL, requestSender);
+ RestliServiceBasedLimiter limiter =
+ broker.getSharedResource(new RestliLimiterFactory<>(), new SharedLimiterKey("my/resource"));
+
+ Assert.assertNotNull(limiter.acquirePermits(10));
+ Assert.assertEquals(requestSender.requestList.size(), 1);
+
+ broker.close();
+ }
+
+ @Test
+ public void testRestliLimiterCalledByLimiterFactory() throws Exception {
+ SharedResourcesBroker<SimpleScopeType> broker = SharedResourcesBrokerFactory.createDefaultTopLevelBroker(
+ ConfigFactory.empty(), SimpleScopeType.GLOBAL.defaultScopeInstance());
+
+ MyRequestSender requestSender = new MyRequestSender();
+
+ broker.bindSharedResourceAtScope(new RedirectAwareRestClientRequestSender.Factory<>(),
+ new SharedRestClientKey(RestliLimiterFactory.RESTLI_SERVICE_NAME), SimpleScopeType.GLOBAL, requestSender);
+ Limiter limiter =
+ broker.getSharedResource(new SharedLimiterFactory<>(), new SharedLimiterKey("my/resource"));
+
+ Assert.assertNotNull(limiter.acquirePermits(10));
+ Assert.assertEquals(requestSender.requestList.size(), 1);
+
+ broker.close();
+ }
+
+ @Test
+ public void testSkipGlobalLimiterOnLimiterFactory() throws Exception {
+ Map<String, String> configMap = ImmutableMap.of(
+ BrokerConfigurationKeyGenerator.generateKey(new SharedLimiterFactory(), null, null, SharedLimiterFactory.SKIP_GLOBAL_LIMITER_KEY), "true"
+ );
+
+ SharedResourcesBroker<SimpleScopeType> broker = SharedResourcesBrokerFactory.createDefaultTopLevelBroker(
+ ConfigFactory.parseMap(configMap), SimpleScopeType.GLOBAL.defaultScopeInstance());
+
+ MyRequestSender requestSender = new MyRequestSender();
+
+ broker.bindSharedResourceAtScope(new RedirectAwareRestClientRequestSender.Factory<>(),
+ new SharedRestClientKey(RestliLimiterFactory.RESTLI_SERVICE_NAME), SimpleScopeType.GLOBAL, requestSender);
+ Limiter limiter =
+ broker.getSharedResource(new SharedLimiterFactory<>(), new SharedLimiterKey("my/resource"));
+
+ Assert.assertNotNull(limiter.acquirePermits(10));
+ Assert.assertEquals(requestSender.requestList.size(), 0);
+
+ broker.close();
+ }
+
+ public static class MyRequestSender implements RequestSender {
+ List<PermitRequest> requestList = Lists.newArrayList();
+
+ @Override
+ public void sendRequest(PermitRequest request, Callback<Response<PermitAllocation>> callback) {
+ this.requestList.add(request);
+
+ PermitAllocation permitAllocation = new PermitAllocation();
+ permitAllocation.setPermits(request.getPermits());
+ permitAllocation.setExpiration(Long.MAX_VALUE);
+
+ Response<PermitAllocation> response = Mockito.mock(Response.class);
+ Mockito.when(response.getEntity()).thenReturn(permitAllocation);
+ callback.onSuccess(response);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/82b5b2d1/gobblin-utility/src/main/java/gobblin/util/limiter/broker/SharedLimiterFactory.java
----------------------------------------------------------------------
diff --git a/gobblin-utility/src/main/java/gobblin/util/limiter/broker/SharedLimiterFactory.java b/gobblin-utility/src/main/java/gobblin/util/limiter/broker/SharedLimiterFactory.java
index 0d0c5f1..0ae45a6 100644
--- a/gobblin-utility/src/main/java/gobblin/util/limiter/broker/SharedLimiterFactory.java
+++ b/gobblin-utility/src/main/java/gobblin/util/limiter/broker/SharedLimiterFactory.java
@@ -32,6 +32,7 @@ import gobblin.broker.iface.SharedResourcesBroker;
import gobblin.broker.iface.SharedResourceFactoryResponse;
import gobblin.broker.ResourceCoordinate;
import gobblin.util.ClassAliasResolver;
+import gobblin.util.ConfigUtils;
import gobblin.util.limiter.Limiter;
import gobblin.util.limiter.LimiterFactory;
import gobblin.util.limiter.MultiLimiter;
@@ -57,6 +58,11 @@ public class SharedLimiterFactory<S extends ScopeType<S>> implements SharedResou
public static final String NAME = "limiter";
public static final String LIMITER_CLASS_KEY = "class";
public static final String FAIL_IF_NO_GLOBAL_LIMITER_KEY = "failIfNoGlobalLimiter";
+ /**
+ * Skip use of global limiter. In general, this should not be used, but it is provided to easily disable global limiters
+ * in case of issues with the coordination server.
+ */
+ public static final String SKIP_GLOBAL_LIMITER_KEY = "skipGlobalLimiter";
public static final String FAIL_ON_UNKNOWN_RESOURCE_ID = "faiOnUnknownResourceId";
private static final ClassAliasResolver<LimiterFactory> RESOLVER = new ClassAliasResolver<>(LimiterFactory.class);
@@ -74,7 +80,13 @@ public class SharedLimiterFactory<S extends ScopeType<S>> implements SharedResou
Config config = configView.getConfig();
SharedLimiterKey.GlobalLimiterPolicy globalLimiterPolicy = configView.getKey().getGlobalLimiterPolicy();
- if (config.hasPath(FAIL_IF_NO_GLOBAL_LIMITER_KEY) && config.getBoolean(FAIL_IF_NO_GLOBAL_LIMITER_KEY) &&
+ if (ConfigUtils.getBoolean(config, SKIP_GLOBAL_LIMITER_KEY, false)) {
+ if (globalLimiterPolicy != SharedLimiterKey.GlobalLimiterPolicy.LOCAL_ONLY) {
+ SharedLimiterKey modifiedKey = new SharedLimiterKey(configView.getKey().getResourceLimitedPath(),
+ SharedLimiterKey.GlobalLimiterPolicy.LOCAL_ONLY);
+ return new ResourceCoordinate<>(this, modifiedKey, (S) configView.getScope());
+ }
+ } else if (config.hasPath(FAIL_IF_NO_GLOBAL_LIMITER_KEY) && config.getBoolean(FAIL_IF_NO_GLOBAL_LIMITER_KEY) &&
globalLimiterPolicy != SharedLimiterKey.GlobalLimiterPolicy.USE_GLOBAL) {
// if user has specified FAIL_IF_NO_GLOBAL_LIMITER_KEY, promote the policy from USE_GLOBAL_IF_CONFIGURED to USE_GLOBAL
// e.g. fail if no GLOBAL configuration is present
[2/2] incubator-gobblin git commit: Merge pull request #1874 from
ibuenros/allow-disable-global-throttling
Posted by ab...@apache.org.
Merge pull request #1874 from ibuenros/allow-disable-global-throttling
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/a080ad84
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/a080ad84
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/a080ad84
Branch: refs/heads/master
Commit: a080ad8437041990b82eccae8a3e70c1ca200799
Parents: 725a082 82b5b2d
Author: Abhishek Tiwari <ab...@gmail.com>
Authored: Sat Jul 29 01:03:25 2017 -0700
Committer: Abhishek Tiwari <ab...@gmail.com>
Committed: Sat Jul 29 01:03:25 2017 -0700
----------------------------------------------------------------------
.../util/limiter/BatchedPermitsRequester.java | 11 +-
.../RedirectAwareRestClientRequestSender.java | 5 +-
.../util/limiter/RestliLimiterFactoryTest.java | 121 +++++++++++++++++++
.../limiter/broker/SharedLimiterFactory.java | 14 ++-
4 files changed, 146 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/a080ad84/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/main/java/gobblin/util/limiter/RedirectAwareRestClientRequestSender.java
----------------------------------------------------------------------