You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/05/09 16:55:30 UTC
[1/4] beam git commit: Use guava Base64 encoding instead of google
api client
Repository: beam
Updated Branches:
refs/heads/release-2.0.0 e282601b7 -> 5763c384d
Use guava Base64 encoding instead of google api client
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8ea53a4b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8ea53a4b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8ea53a4b
Branch: refs/heads/release-2.0.0
Commit: 8ea53a4b087b4a4df5ff142250c1d6f4d7004009
Parents: f6a0c67
Author: Vikas Kedigehalli <vi...@google.com>
Authored: Fri May 5 19:33:45 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Tue May 9 09:54:29 2017 -0700
----------------------------------------------------------------------
.../org/apache/beam/sdk/coders/StructuralByteArray.java | 4 ++--
.../org/apache/beam/sdk/testing/MatcherDeserializer.java | 4 ++--
.../org/apache/beam/sdk/testing/MatcherSerializer.java | 4 ++--
.../main/java/org/apache/beam/sdk/util/CoderUtils.java | 10 +++++++---
4 files changed, 13 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/8ea53a4b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StructuralByteArray.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StructuralByteArray.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StructuralByteArray.java
index 226f79c..edde68e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StructuralByteArray.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StructuralByteArray.java
@@ -17,8 +17,8 @@
*/
package org.apache.beam.sdk.coders;
-import static com.google.api.client.util.Base64.encodeBase64String;
+import com.google.common.io.BaseEncoding;
import java.util.Arrays;
/**
@@ -53,6 +53,6 @@ public class StructuralByteArray {
@Override
public String toString() {
- return "base64:" + encodeBase64String(value);
+ return "base64:" + BaseEncoding.base64().encode(value);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/8ea53a4b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/MatcherDeserializer.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/MatcherDeserializer.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/MatcherDeserializer.java
index 6ca07ba..e7aa5a7 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/MatcherDeserializer.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/MatcherDeserializer.java
@@ -22,7 +22,7 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonDeserializer;
import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.api.client.util.Base64;
+import com.google.common.io.BaseEncoding;
import java.io.IOException;
import org.apache.beam.sdk.util.SerializableUtils;
@@ -36,7 +36,7 @@ class MatcherDeserializer extends JsonDeserializer<SerializableMatcher<?>> {
throws IOException, JsonProcessingException {
ObjectNode node = jsonParser.readValueAsTree();
String matcher = node.get("matcher").asText();
- byte[] in = Base64.decodeBase64(matcher);
+ byte[] in = BaseEncoding.base64().decode(matcher);
return (SerializableMatcher<?>) SerializableUtils
.deserializeFromByteArray(in, "SerializableMatcher");
}
http://git-wip-us.apache.org/repos/asf/beam/blob/8ea53a4b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/MatcherSerializer.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/MatcherSerializer.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/MatcherSerializer.java
index 2b4584c..35375f6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/MatcherSerializer.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/MatcherSerializer.java
@@ -21,7 +21,7 @@ import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonSerializer;
import com.fasterxml.jackson.databind.SerializerProvider;
-import com.google.api.client.util.Base64;
+import com.google.common.io.BaseEncoding;
import java.io.IOException;
import org.apache.beam.sdk.util.SerializableUtils;
@@ -33,7 +33,7 @@ class MatcherSerializer extends JsonSerializer<SerializableMatcher<?>> {
public void serialize(SerializableMatcher<?> matcher, JsonGenerator jsonGenerator,
SerializerProvider serializerProvider) throws IOException, JsonProcessingException {
byte[] out = SerializableUtils.serializeToByteArray(matcher);
- String encodedString = Base64.encodeBase64String(out);
+ String encodedString = BaseEncoding.base64().encode(out);
jsonGenerator.writeStartObject();
jsonGenerator.writeStringField("matcher", encodedString);
jsonGenerator.writeEndObject();
http://git-wip-us.apache.org/repos/asf/beam/blob/8ea53a4b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CoderUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CoderUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CoderUtils.java
index 3380a10..da77829 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CoderUtils.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CoderUtils.java
@@ -17,8 +17,9 @@
*/
package org.apache.beam.sdk.util;
-import com.google.api.client.util.Base64;
import com.google.common.base.Throwables;
+import com.google.common.io.BaseEncoding;
+
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
@@ -26,6 +27,7 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.lang.ref.SoftReference;
import java.lang.reflect.ParameterizedType;
+
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.values.TypeDescriptor;
@@ -163,7 +165,7 @@ public final class CoderUtils {
public static <T> String encodeToBase64(Coder<T> coder, T value)
throws CoderException {
byte[] rawValue = encodeToByteArray(coder, value);
- return Base64.encodeBase64URLSafeString(rawValue);
+ return BaseEncoding.base64Url().omitPadding().encode(rawValue);
}
/**
@@ -171,7 +173,9 @@ public final class CoderUtils {
*/
public static <T> T decodeFromBase64(Coder<T> coder, String encodedValue) throws CoderException {
return decodeFromSafeStream(
- coder, new ByteArrayInputStream(Base64.decodeBase64(encodedValue)), Coder.Context.OUTER);
+ coder,
+ new ByteArrayInputStream(BaseEncoding.base64Url().omitPadding().decode(encodedValue)),
+ Coder.Context.OUTER);
}
/**
[2/4] beam git commit: Remove google api BackOff usage from sdks/core
Posted by lc...@apache.org.
http://git-wip-us.apache.org/repos/asf/beam/blob/f6a0c674/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
index d6464dd..16bb1b4 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
@@ -32,9 +32,6 @@ import static com.google.datastore.v1.client.DatastoreHelper.makeUpsert;
import static com.google.datastore.v1.client.DatastoreHelper.makeValue;
import com.google.api.client.http.HttpRequestInitializer;
-import com.google.api.client.util.BackOff;
-import com.google.api.client.util.BackOffUtils;
-import com.google.api.client.util.Sleeper;
import com.google.auth.Credentials;
import com.google.auth.http.HttpCredentialsAdapter;
import com.google.auto.value.AutoValue;
@@ -89,8 +86,11 @@ import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.DisplayData.Builder;
import org.apache.beam.sdk.transforms.display.HasDisplayData;
+import org.apache.beam.sdk.util.BackOff;
+import org.apache.beam.sdk.util.BackOffUtils;
import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.sdk.util.RetryHttpRequestInitializer;
+import org.apache.beam.sdk.util.Sleeper;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
http://git-wip-us.apache.org/repos/asf/beam/blob/f6a0c674/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
index ef51650..b41490f 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
@@ -67,7 +67,8 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl.DatasetServiceIm
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl.JobServiceImpl;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.ExpectedLogs;
-import org.apache.beam.sdk.testing.FastNanoClockAndSleeper;
+import org.apache.beam.sdk.util.BackOffAdapter;
+import org.apache.beam.sdk.util.FastNanoClockAndSleeper;
import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.sdk.util.RetryHttpRequestInitializer;
import org.apache.beam.sdk.util.Transport;
@@ -133,7 +134,8 @@ public class BigQueryServicesImplTest {
Sleeper sleeper = new FastNanoClockAndSleeper();
JobServiceImpl.startJob(
- testJob, new ApiErrorExtractor(), bigquery, sleeper, FluentBackoff.DEFAULT.backoff());
+ testJob, new ApiErrorExtractor(), bigquery, sleeper,
+ BackOffAdapter.toGcpBackOff(FluentBackoff.DEFAULT.backoff()));
verify(response, times(1)).getStatusCode();
verify(response, times(1)).getContent();
@@ -157,7 +159,8 @@ public class BigQueryServicesImplTest {
Sleeper sleeper = new FastNanoClockAndSleeper();
JobServiceImpl.startJob(
- testJob, new ApiErrorExtractor(), bigquery, sleeper, FluentBackoff.DEFAULT.backoff());
+ testJob, new ApiErrorExtractor(), bigquery, sleeper,
+ BackOffAdapter.toGcpBackOff(FluentBackoff.DEFAULT.backoff()));
verify(response, times(1)).getStatusCode();
verify(response, times(1)).getContent();
@@ -185,7 +188,8 @@ public class BigQueryServicesImplTest {
Sleeper sleeper = new FastNanoClockAndSleeper();
JobServiceImpl.startJob(
- testJob, new ApiErrorExtractor(), bigquery, sleeper, FluentBackoff.DEFAULT.backoff());
+ testJob, new ApiErrorExtractor(), bigquery, sleeper,
+ BackOffAdapter.toGcpBackOff(FluentBackoff.DEFAULT.backoff()));
verify(response, times(2)).getStatusCode();
verify(response, times(2)).getContent();
@@ -500,7 +504,8 @@ public class BigQueryServicesImplTest {
DatasetServiceImpl dataService =
new DatasetServiceImpl(bigquery, PipelineOptionsFactory.create());
- dataService.insertAll(ref, rows, null, TEST_BACKOFF.backoff(), new MockSleeper());
+ dataService.insertAll(ref, rows, null,
+ BackOffAdapter.toGcpBackOff(TEST_BACKOFF.backoff()), new MockSleeper());
verify(response, times(2)).getStatusCode();
verify(response, times(2)).getContent();
verify(response, times(2)).getContentType();
@@ -536,7 +541,8 @@ public class BigQueryServicesImplTest {
DatasetServiceImpl dataService =
new DatasetServiceImpl(bigquery, PipelineOptionsFactory.create());
- dataService.insertAll(ref, rows, insertIds, TEST_BACKOFF.backoff(), new MockSleeper());
+ dataService.insertAll(ref, rows, insertIds,
+ BackOffAdapter.toGcpBackOff(TEST_BACKOFF.backoff()), new MockSleeper());
verify(response, times(2)).getStatusCode();
verify(response, times(2)).getContent();
verify(response, times(2)).getContentType();
@@ -577,7 +583,8 @@ public class BigQueryServicesImplTest {
// Expect it to fail.
try {
- dataService.insertAll(ref, rows, null, TEST_BACKOFF.backoff(), new MockSleeper());
+ dataService.insertAll(ref, rows, null,
+ BackOffAdapter.toGcpBackOff(TEST_BACKOFF.backoff()), new MockSleeper());
fail();
} catch (IOException e) {
assertThat(e, instanceOf(IOException.class));
@@ -617,7 +624,8 @@ public class BigQueryServicesImplTest {
new DatasetServiceImpl(bigquery, PipelineOptionsFactory.create());
try {
- dataService.insertAll(ref, rows, null, TEST_BACKOFF.backoff(), new MockSleeper());
+ dataService.insertAll(ref, rows, null,
+ BackOffAdapter.toGcpBackOff(TEST_BACKOFF.backoff()), new MockSleeper());
fail();
} catch (RuntimeException e) {
verify(response, times(1)).getStatusCode();
http://git-wip-us.apache.org/repos/asf/beam/blob/f6a0c674/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java
index ed6a0be..2045bb7 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java
@@ -66,6 +66,7 @@ import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService;
+import org.apache.beam.sdk.util.BackOffAdapter;
import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.sdk.util.MimeTypes;
import org.apache.beam.sdk.util.Transport;
@@ -187,11 +188,12 @@ class FakeJobService implements JobService, Serializable {
public Job pollJob(JobReference jobRef, int maxAttempts)
throws InterruptedException {
BackOff backoff =
- FluentBackoff.DEFAULT
- .withMaxRetries(maxAttempts)
- .withInitialBackoff(Duration.millis(10))
- .withMaxBackoff(Duration.standardSeconds(1))
- .backoff();
+ BackOffAdapter.toGcpBackOff(
+ FluentBackoff.DEFAULT
+ .withMaxRetries(maxAttempts)
+ .withInitialBackoff(Duration.millis(10))
+ .withMaxBackoff(Duration.standardSeconds(1))
+ .backoff());
Sleeper sleeper = Sleeper.DEFAULT;
try {
do {
http://git-wip-us.apache.org/repos/asf/beam/blob/f6a0c674/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java
index 85c27dd..dc91638 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java
@@ -26,9 +26,6 @@ import static com.google.datastore.v1.client.DatastoreHelper.makeUpsert;
import static com.google.datastore.v1.client.DatastoreHelper.makeValue;
import com.google.api.client.http.HttpRequestInitializer;
-import com.google.api.client.util.BackOff;
-import com.google.api.client.util.BackOffUtils;
-import com.google.api.client.util.Sleeper;
import com.google.auth.Credentials;
import com.google.auth.http.HttpCredentialsAdapter;
import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer;
@@ -57,8 +54,11 @@ import javax.annotation.Nullable;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.util.BackOff;
+import org.apache.beam.sdk.util.BackOffUtils;
import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.sdk.util.RetryHttpRequestInitializer;
+import org.apache.beam.sdk.util.Sleeper;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/beam/blob/f6a0c674/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/testing/BigqueryMatcher.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/testing/BigqueryMatcher.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/testing/BigqueryMatcher.java
index 2bdfffa..6dc810b 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/testing/BigqueryMatcher.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/testing/BigqueryMatcher.java
@@ -49,6 +49,7 @@ import javax.annotation.Nonnull;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.testing.SerializableMatcher;
+import org.apache.beam.sdk.util.BackOffAdapter;
import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.sdk.util.Transport;
import org.hamcrest.Description;
@@ -118,7 +119,8 @@ public class BigqueryMatcher extends TypeSafeMatcher<PipelineResult>
queryContent.setQuery(query);
response = queryWithRetries(
- bigqueryClient, queryContent, Sleeper.DEFAULT, BACKOFF_FACTORY.backoff());
+ bigqueryClient, queryContent, Sleeper.DEFAULT,
+ BackOffAdapter.toGcpBackOff(BACKOFF_FACTORY.backoff()));
} catch (IOException | InterruptedException e) {
if (e instanceof InterruptedIOException) {
Thread.currentThread().interrupt();
http://git-wip-us.apache.org/repos/asf/beam/blob/f6a0c674/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/testing/BigqueryMatcherTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/testing/BigqueryMatcherTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/testing/BigqueryMatcherTest.java
index 5fcdce9..2b03909 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/testing/BigqueryMatcherTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/testing/BigqueryMatcherTest.java
@@ -36,7 +36,8 @@ import com.google.common.collect.Lists;
import java.io.IOException;
import java.math.BigInteger;
import org.apache.beam.sdk.PipelineResult;
-import org.apache.beam.sdk.testing.FastNanoClockAndSleeper;
+import org.apache.beam.sdk.util.BackOffAdapter;
+import org.apache.beam.sdk.util.FastNanoClockAndSleeper;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -131,7 +132,7 @@ public class BigqueryMatcherTest {
mockBigqueryClient,
new QueryRequest(),
fastClock,
- BigqueryMatcher.BACKOFF_FACTORY.backoff());
+ BackOffAdapter.toGcpBackOff(BigqueryMatcher.BACKOFF_FACTORY.backoff()));
} finally {
verify(mockJobs, atLeast(BigqueryMatcher.MAX_QUERY_RETRIES))
.query(eq(projectId), eq(new QueryRequest()));
@@ -151,7 +152,7 @@ public class BigqueryMatcherTest {
mockBigqueryClient,
new QueryRequest(),
fastClock,
- BigqueryMatcher.BACKOFF_FACTORY.backoff());
+ BackOffAdapter.toGcpBackOff(BigqueryMatcher.BACKOFF_FACTORY.backoff()));
} finally {
verify(mockJobs, atLeast(BigqueryMatcher.MAX_QUERY_RETRIES))
.query(eq(projectId), eq(new QueryRequest()));
[3/4] beam git commit: Remove google api BackOff usage from sdks/core
Posted by lc...@apache.org.
Remove google api BackOff usage from sdks/core
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f6a0c674
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f6a0c674
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f6a0c674
Branch: refs/heads/release-2.0.0
Commit: f6a0c67499afa9364233db822e26c80161302fa2
Parents: e282601
Author: Vikas Kedigehalli <vi...@google.com>
Authored: Fri May 5 19:24:51 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Tue May 9 09:54:29 2017 -0700
----------------------------------------------------------------------
.../beam/examples/common/ExampleUtils.java | 6 +-
.../beam/examples/WindowedWordCountIT.java | 2 +-
.../runners/dataflow/DataflowPipelineJob.java | 18 +++-
.../beam/runners/dataflow/util/PackageUtil.java | 3 +-
.../dataflow/DataflowPipelineJobTest.java | 13 ++-
.../runners/dataflow/util/PackageUtilTest.java | 2 +-
runners/spark/pom.xml | 5 -
.../beam/runners/spark/io/MicrobatchSource.java | 2 +-
sdks/java/core/pom.xml | 5 -
.../sdk/io/BoundedReadFromUnboundedSource.java | 2 +-
.../beam/sdk/testing/FileChecksumMatcher.java | 2 +-
.../java/org/apache/beam/sdk/util/BackOff.java | 81 ++++++++++++++++
.../org/apache/beam/sdk/util/BackOffUtils.java | 57 ++++++++++++
.../beam/sdk/util/ExplicitShardedFile.java | 3 -
.../org/apache/beam/sdk/util/FluentBackoff.java | 1 -
.../beam/sdk/util/NumberedShardedFile.java | 3 -
.../org/apache/beam/sdk/util/ShardedFile.java | 2 -
.../java/org/apache/beam/sdk/util/Sleeper.java | 48 ++++++++++
.../sdk/util/UploadIdResponseInterceptor.java | 60 ------------
.../org/apache/beam/SdkCoreApiSurfaceTest.java | 1 -
.../beam/sdk/testing/ExpectedLogsTest.java | 2 +-
.../sdk/testing/FastNanoClockAndSleeper.java | 47 ----------
.../testing/FastNanoClockAndSleeperTest.java | 47 ----------
.../sdk/testing/FileChecksumMatcherTest.java | 5 -
.../beam/sdk/testing/SystemNanoTimeSleeper.java | 2 +-
.../apache/beam/sdk/util/FluentBackoffTest.java | 1 -
.../beam/sdk/util/NumberedShardedFileTest.java | 14 ++-
.../util/UploadIdResponseInterceptorTest.java | 98 --------------------
.../sdk/extensions/gcp/options/GcpOptions.java | 3 +-
.../apache/beam/sdk/util/BackOffAdapter.java | 43 +++++++++
.../java/org/apache/beam/sdk/util/GcsUtil.java | 13 ++-
.../sdk/util/UploadIdResponseInterceptor.java | 60 ++++++++++++
.../beam/sdk/util/FastNanoClockAndSleeper.java | 47 ++++++++++
.../sdk/util/FastNanoClockAndSleeperTest.java | 47 ++++++++++
.../org/apache/beam/sdk/util/GcsUtilTest.java | 23 +++--
.../util/UploadIdResponseInterceptorTest.java | 98 ++++++++++++++++++++
.../io/gcp/bigquery/BigQueryServicesImpl.java | 81 +++++++---------
.../gcp/bigquery/BigQueryTableRowIterator.java | 7 +-
.../beam/sdk/io/gcp/datastore/DatastoreV1.java | 6 +-
.../gcp/bigquery/BigQueryServicesImplTest.java | 24 +++--
.../sdk/io/gcp/bigquery/FakeJobService.java | 12 ++-
.../beam/sdk/io/gcp/datastore/V1TestUtil.java | 6 +-
.../sdk/io/gcp/testing/BigqueryMatcher.java | 4 +-
.../sdk/io/gcp/testing/BigqueryMatcherTest.java | 7 +-
44 files changed, 620 insertions(+), 393 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/f6a0c674/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java b/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java
index 409085a..6e4698f 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java
@@ -20,9 +20,6 @@ package org.apache.beam.examples.common;
import com.google.api.client.googleapis.json.GoogleJsonResponseException;
import com.google.api.client.googleapis.services.AbstractGoogleClientRequest;
import com.google.api.client.http.HttpRequestInitializer;
-import com.google.api.client.util.BackOff;
-import com.google.api.client.util.BackOffUtils;
-import com.google.api.client.util.Sleeper;
import com.google.api.services.bigquery.Bigquery;
import com.google.api.services.bigquery.Bigquery.Datasets;
import com.google.api.services.bigquery.Bigquery.Tables;
@@ -51,8 +48,11 @@ import org.apache.beam.sdk.extensions.gcp.auth.NullCredentialInitializer;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubOptions;
import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.BackOff;
+import org.apache.beam.sdk.util.BackOffUtils;
import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.sdk.util.RetryHttpRequestInitializer;
+import org.apache.beam.sdk.util.Sleeper;
import org.apache.beam.sdk.util.Transport;
import org.joda.time.Duration;
http://git-wip-us.apache.org/repos/asf/beam/blob/f6a0c674/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java
----------------------------------------------------------------------
diff --git a/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java b/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java
index f7e35c0..93c4543 100644
--- a/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java
+++ b/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java
@@ -19,7 +19,6 @@ package org.apache.beam.examples;
import static org.hamcrest.Matchers.equalTo;
-import com.google.api.client.util.Sleeper;
import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
@@ -47,6 +46,7 @@ import org.apache.beam.sdk.util.ExplicitShardedFile;
import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.sdk.util.NumberedShardedFile;
import org.apache.beam.sdk.util.ShardedFile;
+import org.apache.beam.sdk.util.Sleeper;
import org.hamcrest.Description;
import org.hamcrest.TypeSafeMatcher;
import org.joda.time.Duration;
http://git-wip-us.apache.org/repos/asf/beam/blob/f6a0c674/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
index 23084ed..2d23983 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
@@ -46,6 +46,7 @@ import org.apache.beam.runners.dataflow.util.MonitoringUtil;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.metrics.MetricResults;
import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.util.BackOffAdapter;
import org.apache.beam.sdk.util.FluentBackoff;
import org.joda.time.Duration;
import org.slf4j.Logger;
@@ -285,9 +286,10 @@ public class DataflowPipelineJob implements PipelineResult {
BackOff backoff;
if (!duration.isLongerThan(Duration.ZERO)) {
- backoff = MESSAGES_BACKOFF_FACTORY.backoff();
+ backoff = BackOffAdapter.toGcpBackOff(MESSAGES_BACKOFF_FACTORY.backoff());
} else {
- backoff = MESSAGES_BACKOFF_FACTORY.withMaxCumulativeBackoff(duration).backoff();
+ backoff = BackOffAdapter.toGcpBackOff(
+ MESSAGES_BACKOFF_FACTORY.withMaxCumulativeBackoff(duration).backoff());
}
// This function tracks the cumulative time from the *first request* to enforce the wall-clock
@@ -299,7 +301,10 @@ public class DataflowPipelineJob implements PipelineResult {
do {
// Get the state of the job before listing messages. This ensures we always fetch job
// messages after the job finishes to ensure we have all them.
- state = getStateWithRetries(STATUS_BACKOFF_FACTORY.withMaxRetries(0).backoff(), sleeper);
+ state = getStateWithRetries(
+ BackOffAdapter.toGcpBackOff(
+ STATUS_BACKOFF_FACTORY.withMaxRetries(0).backoff()),
+ sleeper);
boolean hasError = state == State.UNKNOWN;
if (messageHandler != null && !hasError) {
@@ -354,7 +359,8 @@ public class DataflowPipelineJob implements PipelineResult {
Duration consumed = Duration.millis((nanosConsumed + 999999) / 1000000);
Duration remaining = duration.minus(consumed);
if (remaining.isLongerThan(Duration.ZERO)) {
- backoff = MESSAGES_BACKOFF_FACTORY.withMaxCumulativeBackoff(remaining).backoff();
+ backoff = BackOffAdapter.toGcpBackOff(
+ MESSAGES_BACKOFF_FACTORY.withMaxCumulativeBackoff(remaining).backoff());
} else {
// If there is no time remaining, don't bother backing off.
backoff = BackOff.STOP_BACKOFF;
@@ -437,7 +443,9 @@ public class DataflowPipelineJob implements PipelineResult {
return terminalState;
}
- return getStateWithRetries(STATUS_BACKOFF_FACTORY.backoff(), Sleeper.DEFAULT);
+ return getStateWithRetries(
+ BackOffAdapter.toGcpBackOff(STATUS_BACKOFF_FACTORY.backoff()),
+ Sleeper.DEFAULT);
}
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/f6a0c674/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java
index 5ddcd29..931f7ea 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java
@@ -54,6 +54,7 @@ import javax.annotation.Nullable;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.fs.CreateOptions;
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
+import org.apache.beam.sdk.util.BackOffAdapter;
import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.sdk.util.ZipFiles;
import org.joda.time.Duration;
@@ -210,7 +211,7 @@ class PackageUtil {
}
// Upload file, retrying on failure.
- BackOff backoff = BACKOFF_FACTORY.backoff();
+ BackOff backoff = BackOffAdapter.toGcpBackOff(BACKOFF_FACTORY.backoff());
while (true) {
try {
LOG.debug("Uploading classpath element {} to {}", source, target);
http://git-wip-us.apache.org/repos/asf/beam/blob/f6a0c674/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
index f868a17..e95babb 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
@@ -57,8 +57,9 @@ import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.testing.ExpectedLogs;
-import org.apache.beam.sdk.testing.FastNanoClockAndSleeper;
import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.BackOffAdapter;
+import org.apache.beam.sdk.util.FastNanoClockAndSleeper;
import org.apache.beam.sdk.util.NoopPathValidator;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.POutput;
@@ -347,7 +348,10 @@ public class DataflowPipelineJobTest {
assertEquals(
State.RUNNING,
- job.getStateWithRetries(DataflowPipelineJob.STATUS_BACKOFF_FACTORY.backoff(), fastClock));
+ job.getStateWithRetries(
+ BackOffAdapter.toGcpBackOff(
+ DataflowPipelineJob.STATUS_BACKOFF_FACTORY.backoff()),
+ fastClock));
}
@Test
@@ -368,7 +372,10 @@ public class DataflowPipelineJobTest {
long startTime = fastClock.nanoTime();
assertEquals(
State.UNKNOWN,
- job.getStateWithRetries(DataflowPipelineJob.STATUS_BACKOFF_FACTORY.backoff(), fastClock));
+ job.getStateWithRetries(
+ BackOffAdapter.toGcpBackOff(
+ DataflowPipelineJob.STATUS_BACKOFF_FACTORY.backoff()),
+ fastClock));
long timeDiff = TimeUnit.NANOSECONDS.toMillis(fastClock.nanoTime() - startTime);
checkValidInterval(DataflowPipelineJob.STATUS_POLLING_INTERVAL,
DataflowPipelineJob.STATUS_POLLING_RETRIES, timeDiff);
http://git-wip-us.apache.org/repos/asf/beam/blob/f6a0c674/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
index 4ae3a77..c7a660e 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
@@ -79,8 +79,8 @@ import org.apache.beam.sdk.io.fs.CreateOptions.StandardCreateOptions;
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.ExpectedLogs;
-import org.apache.beam.sdk.testing.FastNanoClockAndSleeper;
import org.apache.beam.sdk.testing.RegexMatcher;
+import org.apache.beam.sdk.util.FastNanoClockAndSleeper;
import org.apache.beam.sdk.util.GcsUtil;
import org.apache.beam.sdk.util.GcsUtil.StorageObjectOrIOException;
import org.apache.beam.sdk.util.MimeTypes;
http://git-wip-us.apache.org/repos/asf/beam/blob/f6a0c674/runners/spark/pom.xml
----------------------------------------------------------------------
diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml
index ddec7e7..6991171 100644
--- a/runners/spark/pom.xml
+++ b/runners/spark/pom.xml
@@ -190,11 +190,6 @@
<artifactId>joda-time</artifactId>
</dependency>
<dependency>
- <groupId>com.google.http-client</groupId>
- <artifactId>google-http-client</artifactId>
- <version>${google-clients.version}</version>
- </dependency>
- <dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
<scope>provided</scope>
http://git-wip-us.apache.org/repos/asf/beam/blob/f6a0c674/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
index 53d1ba7..3b48caf 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
@@ -18,7 +18,6 @@
package org.apache.beam.runners.spark.io;
-import com.google.api.client.util.BackOff;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
@@ -36,6 +35,7 @@ import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.Source;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.BackOff;
import org.apache.beam.sdk.util.FluentBackoff;
import org.joda.time.Duration;
import org.joda.time.Instant;
http://git-wip-us.apache.org/repos/asf/beam/blob/f6a0c674/sdks/java/core/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/core/pom.xml b/sdks/java/core/pom.xml
index 6620a08..a3967a2 100644
--- a/sdks/java/core/pom.xml
+++ b/sdks/java/core/pom.xml
@@ -125,11 +125,6 @@
<dependencies>
<dependency>
- <groupId>com.google.http-client</groupId>
- <artifactId>google-http-client</artifactId>
- </dependency>
-
- <dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/beam/blob/f6a0c674/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
index d9adf92..c882447 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
@@ -17,7 +17,6 @@
*/
package org.apache.beam.sdk.io;
-import com.google.api.client.util.BackOff;
import com.google.auto.value.AutoValue;
import com.google.common.util.concurrent.Uninterruptibles;
import java.io.IOException;
@@ -35,6 +34,7 @@ import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.util.BackOff;
import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.sdk.util.NameUtils;
import org.apache.beam.sdk.values.PBegin;
http://git-wip-us.apache.org/repos/asf/beam/blob/f6a0c674/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/FileChecksumMatcher.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/FileChecksumMatcher.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/FileChecksumMatcher.java
index 82a6b71..5ed0525 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/FileChecksumMatcher.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/FileChecksumMatcher.java
@@ -21,7 +21,6 @@ package org.apache.beam.sdk.testing;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
-import com.google.api.client.util.Sleeper;
import com.google.common.base.Strings;
import com.google.common.hash.HashCode;
import com.google.common.hash.Hashing;
@@ -34,6 +33,7 @@ import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.sdk.util.NumberedShardedFile;
import org.apache.beam.sdk.util.ShardedFile;
+import org.apache.beam.sdk.util.Sleeper;
import org.hamcrest.Description;
import org.hamcrest.TypeSafeMatcher;
import org.joda.time.Duration;
http://git-wip-us.apache.org/repos/asf/beam/blob/f6a0c674/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BackOff.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BackOff.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BackOff.java
new file mode 100644
index 0000000..5bc6027
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BackOff.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import java.io.IOException;
+
+/**
+ * Back-off policy when retrying an operation.
+ *
+ * <p><b>Note</b>: This interface is copied from Google API client library to avoid its dependency.
+ */
+public interface BackOff {
+
+ /** Indicates that no more retries should be made for use in {@link #nextBackOffMillis()}. */
+ long STOP = -1L;
+
+ /** Reset to initial state. */
+ void reset() throws IOException;
+
+ /**
+ * Gets the number of milliseconds to wait before retrying the operation or {@link #STOP} to
+ * indicate that no retries should be made.
+ *
+ * <p>
+ * Example usage:
+ * </p>
+ *
+ * <pre>
+ long backOffMillis = backoff.nextBackOffMillis();
+ if (backOffMillis == Backoff.STOP) {
+ // do not retry operation
+ } else {
+ // sleep for backOffMillis milliseconds and retry operation
+ }
+ * </pre>
+ */
+ long nextBackOffMillis() throws IOException;
+
+ /**
+ * Fixed back-off policy whose back-off time is always zero, meaning that the operation is retried
+ * immediately without waiting.
+ */
+ BackOff ZERO_BACKOFF = new BackOff() {
+
+ public void reset() throws IOException {
+ }
+
+ public long nextBackOffMillis() throws IOException {
+ return 0;
+ }
+ };
+
+ /**
+ * Fixed back-off policy that always returns {@code #STOP} for {@link #nextBackOffMillis()},
+ * meaning that the operation should not be retried.
+ */
+ BackOff STOP_BACKOFF = new BackOff() {
+
+ public void reset() throws IOException {
+ }
+
+ public long nextBackOffMillis() throws IOException {
+ return STOP;
+ }
+ };
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/f6a0c674/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BackOffUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BackOffUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BackOffUtils.java
new file mode 100644
index 0000000..aa7461c
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BackOffUtils.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.util;
+
+import java.io.IOException;
+
+/**
+ * Utilities for {@link BackOff}.
+ *
+ * <p><b>Note</b>: This is copied from Google API client library to avoid its dependency.
+ */
+public final class BackOffUtils {
+
+ /**
+ * Runs the next iteration of the back-off policy, and returns whether to continue to retry the
+ * operation.
+ *
+ * <p>
+ * If {@code true}, it will call {@link Sleeper#sleep(long)} with the specified number of
+ * milliseconds from {@link BackOff#nextBackOffMillis()}.
+ * </p>
+ *
+ * @param sleeper sleeper
+ * @param backOff back-off policy
+ * @return whether to continue to back off; in other words, whether
+ * {@link BackOff#nextBackOffMillis()} did not return {@link BackOff#STOP}
+ * @throws InterruptedException if any thread has interrupted the current thread
+ */
+ public static boolean next(Sleeper sleeper, BackOff backOff)
+ throws InterruptedException, IOException {
+ long backOffTime = backOff.nextBackOffMillis();
+ if (backOffTime == BackOff.STOP) {
+ return false;
+ }
+ sleeper.sleep(backOffTime);
+ return true;
+ }
+
+ private BackOffUtils() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/f6a0c674/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExplicitShardedFile.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExplicitShardedFile.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExplicitShardedFile.java
index 0f184de..50e5ed1 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExplicitShardedFile.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExplicitShardedFile.java
@@ -18,9 +18,6 @@
package org.apache.beam.sdk.util;
-import com.google.api.client.util.BackOff;
-import com.google.api.client.util.BackOffUtils;
-import com.google.api.client.util.Sleeper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
http://git-wip-us.apache.org/repos/asf/beam/blob/f6a0c674/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FluentBackoff.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FluentBackoff.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FluentBackoff.java
index 479d7a8..468b742 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FluentBackoff.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FluentBackoff.java
@@ -19,7 +19,6 @@ package org.apache.beam.sdk.util;
import static com.google.common.base.Preconditions.checkArgument;
-import com.google.api.client.util.BackOff;
import com.google.common.base.MoreObjects;
import org.joda.time.Duration;
http://git-wip-us.apache.org/repos/asf/beam/blob/f6a0c674/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NumberedShardedFile.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NumberedShardedFile.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NumberedShardedFile.java
index e18dd96..8889358 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NumberedShardedFile.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NumberedShardedFile.java
@@ -21,9 +21,6 @@ package org.apache.beam.sdk.util;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
-import com.google.api.client.util.BackOff;
-import com.google.api.client.util.BackOffUtils;
-import com.google.api.client.util.Sleeper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.common.collect.Iterables;
http://git-wip-us.apache.org/repos/asf/beam/blob/f6a0c674/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ShardedFile.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ShardedFile.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ShardedFile.java
index ec9ed64..5961c4d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ShardedFile.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ShardedFile.java
@@ -18,8 +18,6 @@
package org.apache.beam.sdk.util;
-import com.google.api.client.util.BackOff;
-import com.google.api.client.util.Sleeper;
import java.io.IOException;
import java.io.Serializable;
import java.util.List;
http://git-wip-us.apache.org/repos/asf/beam/blob/f6a0c674/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Sleeper.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Sleeper.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Sleeper.java
new file mode 100644
index 0000000..d180ec6
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Sleeper.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+/**
+ * Sleeper interface to use for requesting the current thread to sleep as specified in
+ * {@link Thread#sleep(long)}.
+ *
+ * <p>
+ * The default implementation can be accessed at {@link #DEFAULT}. Primarily used for testing.
+ * </p>
+ *
+ * <p><b>Note</b>: This interface is copied from Google API client library to avoid its dependency.
+ */
+public interface Sleeper {
+
+ /**
+ * Causes the currently executing thread to sleep (temporarily cease execution) for the specified
+ * number of milliseconds as specified in {@link Thread#sleep(long)}.
+ *
+ * @param millis length of time to sleep in milliseconds
+ * @throws InterruptedException if any thread has interrupted the current thread
+ */
+ void sleep(long millis) throws InterruptedException;
+
+ /** Provides the default implementation based on {@link Thread#sleep(long)}. */
+ Sleeper DEFAULT = new Sleeper() {
+
+ public void sleep(long millis) throws InterruptedException {
+ Thread.sleep(millis);
+ }
+ };
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/f6a0c674/sdks/java/core/src/main/java/org/apache/beam/sdk/util/UploadIdResponseInterceptor.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/UploadIdResponseInterceptor.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/UploadIdResponseInterceptor.java
deleted file mode 100644
index f685b69..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/UploadIdResponseInterceptor.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import com.google.api.client.http.GenericUrl;
-import com.google.api.client.http.HttpResponse;
-import com.google.api.client.http.HttpResponseInterceptor;
-import java.io.IOException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Implements a response intercepter that logs the upload id if the upload
- * id header exists and it is the first request (does not have upload_id parameter in the request).
- * Only logs if debug level is enabled.
- */
-public class UploadIdResponseInterceptor implements HttpResponseInterceptor {
-
- private static final Logger LOG = LoggerFactory.getLogger(UploadIdResponseInterceptor.class);
- private static final String UPLOAD_ID_PARAM = "upload_id";
- private static final String UPLOAD_TYPE_PARAM = "uploadType";
- private static final String UPLOAD_HEADER = "X-GUploader-UploadID";
-
- @Override
- public void interceptResponse(HttpResponse response) throws IOException {
- if (!LOG.isDebugEnabled()) {
- return;
- }
- String uploadId = response.getHeaders().getFirstHeaderStringValue(UPLOAD_HEADER);
- if (uploadId == null) {
- return;
- }
-
- GenericUrl url = response.getRequest().getUrl();
- // The check for no upload id limits the output to one log line per upload.
- // The check for upload type makes sure this is an upload and not a read.
- if (url.get(UPLOAD_ID_PARAM) == null && url.get(UPLOAD_TYPE_PARAM) != null) {
- LOG.debug(
- "Upload ID for url {} on worker {} is {}",
- url,
- System.getProperty("worker_id"),
- uploadId);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/f6a0c674/sdks/java/core/src/test/java/org/apache/beam/SdkCoreApiSurfaceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/SdkCoreApiSurfaceTest.java b/sdks/java/core/src/test/java/org/apache/beam/SdkCoreApiSurfaceTest.java
index 26dd9f9..080f34a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/SdkCoreApiSurfaceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/SdkCoreApiSurfaceTest.java
@@ -38,7 +38,6 @@ public class SdkCoreApiSurfaceTest {
final Set<String> allowed =
ImmutableSet.of(
"org.apache.beam",
- "com.google.api.client",
"com.fasterxml.jackson.annotation",
"com.fasterxml.jackson.core",
"com.fasterxml.jackson.databind",
http://git-wip-us.apache.org/repos/asf/beam/blob/f6a0c674/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/ExpectedLogsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/ExpectedLogsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/ExpectedLogsTest.java
index 1762d0d..d307bed 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/ExpectedLogsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/ExpectedLogsTest.java
@@ -38,7 +38,7 @@ import org.junit.runners.model.Statement;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-/** Tests for {@link FastNanoClockAndSleeper}. */
+/** Tests for {@link ExpectedLogs}. */
@RunWith(JUnit4.class)
public class ExpectedLogsTest {
private static final Logger LOG = LoggerFactory.getLogger(ExpectedLogsTest.class);
http://git-wip-us.apache.org/repos/asf/beam/blob/f6a0c674/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FastNanoClockAndSleeper.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FastNanoClockAndSleeper.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FastNanoClockAndSleeper.java
deleted file mode 100644
index 6bfafa5..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FastNanoClockAndSleeper.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.testing;
-
-import com.google.api.client.util.NanoClock;
-import com.google.api.client.util.Sleeper;
-import org.junit.rules.ExternalResource;
-import org.junit.rules.TestRule;
-
-/**
- * This object quickly moves time forward based upon how much it has been asked to sleep,
- * without actually sleeping, to simulate the backoff.
- */
-public class FastNanoClockAndSleeper extends ExternalResource
- implements NanoClock, Sleeper, TestRule {
- private long fastNanoTime;
-
- @Override
- public long nanoTime() {
- return fastNanoTime;
- }
-
- @Override
- protected void before() throws Throwable {
- fastNanoTime = NanoClock.SYSTEM.nanoTime();
- }
-
- @Override
- public void sleep(long millis) throws InterruptedException {
- fastNanoTime += millis * 1000000L;
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/f6a0c674/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FastNanoClockAndSleeperTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FastNanoClockAndSleeperTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FastNanoClockAndSleeperTest.java
deleted file mode 100644
index 7d20951..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FastNanoClockAndSleeperTest.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.testing;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.util.concurrent.TimeUnit;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/** Tests for {@link FastNanoClockAndSleeper}. */
-@RunWith(JUnit4.class)
-public class FastNanoClockAndSleeperTest {
- @Rule public FastNanoClockAndSleeper fastNanoClockAndSleeper = new FastNanoClockAndSleeper();
-
- @Test
- public void testClockAndSleeper() throws Exception {
- long sleepTimeMs = TimeUnit.SECONDS.toMillis(30);
- long sleepTimeNano = TimeUnit.MILLISECONDS.toNanos(sleepTimeMs);
- long fakeTimeNano = fastNanoClockAndSleeper.nanoTime();
- long startTimeNano = System.nanoTime();
- fastNanoClockAndSleeper.sleep(sleepTimeMs);
- long maxTimeNano = startTimeNano + TimeUnit.SECONDS.toNanos(1);
- // Verify that actual time didn't progress as much as was requested
- assertTrue(System.nanoTime() < maxTimeNano);
- // Verify that the fake time did go up by the amount requested
- assertEquals(fakeTimeNano + sleepTimeNano, fastNanoClockAndSleeper.nanoTime());
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/f6a0c674/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FileChecksumMatcherTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FileChecksumMatcherTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FileChecksumMatcherTest.java
index 4ee6750..80f02e0 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FileChecksumMatcherTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FileChecksumMatcherTest.java
@@ -20,7 +20,6 @@ package org.apache.beam.sdk.testing;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
-import com.google.api.client.util.BackOff;
import com.google.common.io.Files;
import java.io.File;
import java.io.IOException;
@@ -43,14 +42,10 @@ public class FileChecksumMatcherTest {
public TemporaryFolder tmpFolder = new TemporaryFolder();
@Rule
public ExpectedException thrown = ExpectedException.none();
- @Rule
- public FastNanoClockAndSleeper fastClock = new FastNanoClockAndSleeper();
@Mock
private PipelineResult pResult = Mockito.mock(PipelineResult.class);
- private BackOff backOff = FileChecksumMatcher.BACK_OFF_FACTORY.backoff();
-
@Test
public void testPreconditionChecksumIsNull() throws IOException {
String tmpPath = tmpFolder.newFile().getPath();
http://git-wip-us.apache.org/repos/asf/beam/blob/f6a0c674/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SystemNanoTimeSleeper.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SystemNanoTimeSleeper.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SystemNanoTimeSleeper.java
index 3677e84..dd57669 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SystemNanoTimeSleeper.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SystemNanoTimeSleeper.java
@@ -17,9 +17,9 @@
*/
package org.apache.beam.sdk.testing;
-import com.google.api.client.util.Sleeper;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
+import org.apache.beam.sdk.util.Sleeper;
/**
* This class provides an expensive sleeper to deal with issues around Java's
http://git-wip-us.apache.org/repos/asf/beam/blob/f6a0c674/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FluentBackoffTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FluentBackoffTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FluentBackoffTest.java
index 20b03cf..e810278 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FluentBackoffTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FluentBackoffTest.java
@@ -26,7 +26,6 @@ import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
-import com.google.api.client.util.BackOff;
import java.io.IOException;
import org.joda.time.Duration;
import org.junit.Rule;
http://git-wip-us.apache.org/repos/asf/beam/blob/f6a0c674/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NumberedShardedFileTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NumberedShardedFileTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NumberedShardedFileTest.java
index 43a9166..cf8c722 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NumberedShardedFileTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NumberedShardedFileTest.java
@@ -25,16 +25,13 @@ import static org.mockito.Matchers.anyCollection;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.spy;
-import com.google.api.client.util.BackOff;
import com.google.common.io.Files;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.regex.Pattern;
-import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.LocalResources;
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
-import org.apache.beam.sdk.testing.FastNanoClockAndSleeper;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -42,17 +39,18 @@ import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.Mockito;
/** Tests for {@link NumberedShardedFile}. */
@RunWith(JUnit4.class)
public class NumberedShardedFileTest {
@Rule public TemporaryFolder tmpFolder = new TemporaryFolder();
@Rule public ExpectedException thrown = ExpectedException.none();
- @Rule public FastNanoClockAndSleeper fastClock = new FastNanoClockAndSleeper();
-
- @Mock private PipelineResult pResult = Mockito.mock(PipelineResult.class);
+ private Sleeper fastClock = new Sleeper() {
+ @Override
+ public void sleep(long millis) throws InterruptedException {
+ // No sleep.
+ }
+ };
private final BackOff backOff = NumberedShardedFile.BACK_OFF_FACTORY.backoff();
private String filePattern;
http://git-wip-us.apache.org/repos/asf/beam/blob/f6a0c674/sdks/java/core/src/test/java/org/apache/beam/sdk/util/UploadIdResponseInterceptorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/UploadIdResponseInterceptorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/UploadIdResponseInterceptorTest.java
deleted file mode 100644
index 8b9f77e..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/UploadIdResponseInterceptorTest.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import com.google.api.client.http.GenericUrl;
-import com.google.api.client.http.HttpResponse;
-import com.google.api.client.testing.http.HttpTesting;
-import com.google.api.client.testing.http.MockHttpTransport;
-import com.google.api.client.testing.http.MockLowLevelHttpResponse;
-import java.io.IOException;
-import org.apache.beam.sdk.testing.ExpectedLogs;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * A test for {@link UploadIdResponseInterceptor}.
- */
-
-@RunWith(JUnit4.class)
-public class UploadIdResponseInterceptorTest {
-
- @Rule public ExpectedException expectedException = ExpectedException.none();
- // Note that expected logs also turns on debug logging.
- @Rule public ExpectedLogs expectedLogs = ExpectedLogs.none(UploadIdResponseInterceptor.class);
-
- /**
- * Builds a HttpResponse with the given string response.
- *
- * @param header header value to provide or null if none.
- * @param uploadId upload id to provide in the url upload id param or null if none.
- * @param uploadType upload type to provide in url upload type param or null if none.
- * @return HttpResponse with the given parameters
- * @throws IOException
- */
- private HttpResponse buildHttpResponse(String header, String uploadId, String uploadType)
- throws IOException {
- MockHttpTransport.Builder builder = new MockHttpTransport.Builder();
- MockLowLevelHttpResponse resp = new MockLowLevelHttpResponse();
- builder.setLowLevelHttpResponse(resp);
- resp.setStatusCode(200);
- GenericUrl url = new GenericUrl(HttpTesting.SIMPLE_URL);
- if (header != null) {
- resp.addHeader("X-GUploader-UploadID", header);
- }
- if (uploadId != null) {
- url.put("upload_id", uploadId);
- }
- if (uploadType != null) {
- url.put("uploadType", uploadType);
- }
- return builder.build().createRequestFactory().buildGetRequest(url).execute();
- }
-
- /**
- * Tests the responses that should not log.
- */
- @Test
- public void testResponseNoLogging() throws IOException {
- new UploadIdResponseInterceptor().interceptResponse(buildHttpResponse(null, null, null));
- new UploadIdResponseInterceptor().interceptResponse(buildHttpResponse("hh", "a", null));
- new UploadIdResponseInterceptor().interceptResponse(buildHttpResponse(null, "h", null));
- new UploadIdResponseInterceptor().interceptResponse(buildHttpResponse("hh", null, null));
- new UploadIdResponseInterceptor().interceptResponse(buildHttpResponse(null, null, "type"));
- new UploadIdResponseInterceptor().interceptResponse(buildHttpResponse("hh", "a", "type"));
- new UploadIdResponseInterceptor().interceptResponse(buildHttpResponse(null, "h", "type"));
- expectedLogs.verifyNotLogged("");
- }
-
- /**
- * Check that a response logs with the correct log.
- */
- @Test
- public void testResponseLogs() throws IOException {
- new UploadIdResponseInterceptor().interceptResponse(buildHttpResponse("abc", null, "type"));
- GenericUrl url = new GenericUrl(HttpTesting.SIMPLE_URL);
- url.put("uploadType", "type");
- String worker = System.getProperty("worker_id");
- expectedLogs.verifyDebug("Upload ID for url " + url + " on worker " + worker + " is abc");
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/f6a0c674/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java
index a4128e8..985520f 100644
--- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java
+++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java
@@ -53,6 +53,7 @@ import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.DefaultValueFactory;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.BackOffAdapter;
import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.sdk.util.InstanceBuilder;
import org.apache.beam.sdk.util.RetryHttpRequestInitializer;
@@ -319,7 +320,7 @@ public interface GcpOptions extends GoogleApiDebugOptions, PipelineOptions {
return getProjectNumber(
projectId,
crmClient,
- BACKOFF_FACTORY.backoff(),
+ BackOffAdapter.toGcpBackOff(BACKOFF_FACTORY.backoff()),
Sleeper.DEFAULT);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f6a0c674/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/BackOffAdapter.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/BackOffAdapter.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/BackOffAdapter.java
new file mode 100644
index 0000000..e5a0a6e
--- /dev/null
+++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/BackOffAdapter.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import java.io.IOException;
+
+/**
+ * An adapter for converting between Apache Beam and Google API client representations of backoffs.
+ */
+public class BackOffAdapter {
+ /**
+ * Returns an adapter to convert from {@link BackOff} to
+ * {@link com.google.api.client.util.BackOff}.
+ */
+ public static com.google.api.client.util.BackOff toGcpBackOff(final BackOff backOff) {
+ return new com.google.api.client.util.BackOff() {
+ @Override
+ public void reset() throws IOException {
+ backOff.reset();
+ }
+
+ @Override
+ public long nextBackOffMillis() throws IOException {
+ return backOff.nextBackOffMillis();
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/f6a0c674/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
index ee2e231..18e3e2b 100644
--- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
+++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
@@ -306,6 +306,9 @@ public class GcsUtil {
return uploadBufferSizeBytes;
}
+ private static BackOff createBackOff() {
+ return BackOffAdapter.toGcpBackOff(BACKOFF_FACTORY.backoff());
+ }
/**
* Returns the file size from GCS or throws {@link FileNotFoundException}
* if the resource does not exist.
@@ -318,7 +321,7 @@ public class GcsUtil {
* Returns the {@link StorageObject} for the given {@link GcsPath}.
*/
public StorageObject getObject(GcsPath gcsPath) throws IOException {
- return getObject(gcsPath, BACKOFF_FACTORY.backoff(), Sleeper.DEFAULT);
+ return getObject(gcsPath, createBackOff(), Sleeper.DEFAULT);
}
@VisibleForTesting
@@ -377,7 +380,7 @@ public class GcsUtil {
try {
return ResilientOperation.retry(
ResilientOperation.getGoogleRequestCallable(listObject),
- BACKOFF_FACTORY.backoff(),
+ createBackOff(),
RetryDeterminer.SOCKET_ERRORS,
IOException.class);
} catch (Exception e) {
@@ -469,7 +472,7 @@ public class GcsUtil {
public boolean bucketAccessible(GcsPath path) throws IOException {
return bucketAccessible(
path,
- BACKOFF_FACTORY.backoff(),
+ createBackOff(),
Sleeper.DEFAULT);
}
@@ -482,7 +485,7 @@ public class GcsUtil {
public long bucketOwner(GcsPath path) throws IOException {
return getBucket(
path,
- BACKOFF_FACTORY.backoff(),
+ createBackOff(),
Sleeper.DEFAULT).getProjectNumber().longValue();
}
@@ -492,7 +495,7 @@ public class GcsUtil {
*/
public void createBucket(String projectId, Bucket bucket) throws IOException {
createBucket(
- projectId, bucket, BACKOFF_FACTORY.backoff(), Sleeper.DEFAULT);
+ projectId, bucket, createBackOff(), Sleeper.DEFAULT);
}
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/f6a0c674/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/UploadIdResponseInterceptor.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/UploadIdResponseInterceptor.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/UploadIdResponseInterceptor.java
new file mode 100644
index 0000000..f685b69
--- /dev/null
+++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/UploadIdResponseInterceptor.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import com.google.api.client.http.GenericUrl;
+import com.google.api.client.http.HttpResponse;
+import com.google.api.client.http.HttpResponseInterceptor;
+import java.io.IOException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implements a response intercepter that logs the upload id if the upload
+ * id header exists and it is the first request (does not have upload_id parameter in the request).
+ * Only logs if debug level is enabled.
+ */
+public class UploadIdResponseInterceptor implements HttpResponseInterceptor {
+
+ private static final Logger LOG = LoggerFactory.getLogger(UploadIdResponseInterceptor.class);
+ private static final String UPLOAD_ID_PARAM = "upload_id";
+ private static final String UPLOAD_TYPE_PARAM = "uploadType";
+ private static final String UPLOAD_HEADER = "X-GUploader-UploadID";
+
+ @Override
+ public void interceptResponse(HttpResponse response) throws IOException {
+ if (!LOG.isDebugEnabled()) {
+ return;
+ }
+ String uploadId = response.getHeaders().getFirstHeaderStringValue(UPLOAD_HEADER);
+ if (uploadId == null) {
+ return;
+ }
+
+ GenericUrl url = response.getRequest().getUrl();
+ // The check for no upload id limits the output to one log line per upload.
+ // The check for upload type makes sure this is an upload and not a read.
+ if (url.get(UPLOAD_ID_PARAM) == null && url.get(UPLOAD_TYPE_PARAM) != null) {
+ LOG.debug(
+ "Upload ID for url {} on worker {} is {}",
+ url,
+ System.getProperty("worker_id"),
+ uploadId);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/f6a0c674/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/FastNanoClockAndSleeper.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/FastNanoClockAndSleeper.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/FastNanoClockAndSleeper.java
new file mode 100644
index 0000000..f1392d7
--- /dev/null
+++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/FastNanoClockAndSleeper.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import com.google.api.client.util.NanoClock;
+import com.google.api.client.util.Sleeper;
+import org.junit.rules.ExternalResource;
+import org.junit.rules.TestRule;
+
+/**
+ * This object quickly moves time forward based upon how much it has been asked to sleep,
+ * without actually sleeping, to simulate the backoff.
+ */
+public class FastNanoClockAndSleeper extends ExternalResource
+ implements NanoClock, Sleeper, TestRule {
+ private long fastNanoTime;
+
+ @Override
+ public long nanoTime() {
+ return fastNanoTime;
+ }
+
+ @Override
+ protected void before() throws Throwable {
+ fastNanoTime = SYSTEM.nanoTime();
+ }
+
+ @Override
+ public void sleep(long millis) throws InterruptedException {
+ fastNanoTime += millis * 1000000L;
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/f6a0c674/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/FastNanoClockAndSleeperTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/FastNanoClockAndSleeperTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/FastNanoClockAndSleeperTest.java
new file mode 100644
index 0000000..03f9588
--- /dev/null
+++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/FastNanoClockAndSleeperTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.TimeUnit;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link FastNanoClockAndSleeper}. */
+@RunWith(JUnit4.class)
+public class FastNanoClockAndSleeperTest {
+ @Rule public FastNanoClockAndSleeper fastNanoClockAndSleeper = new FastNanoClockAndSleeper();
+
+ @Test
+ public void testClockAndSleeper() throws Exception {
+ long sleepTimeMs = TimeUnit.SECONDS.toMillis(30);
+ long sleepTimeNano = TimeUnit.MILLISECONDS.toNanos(sleepTimeMs);
+ long fakeTimeNano = fastNanoClockAndSleeper.nanoTime();
+ long startTimeNano = System.nanoTime();
+ fastNanoClockAndSleeper.sleep(sleepTimeMs);
+ long maxTimeNano = startTimeNano + TimeUnit.SECONDS.toNanos(1);
+ // Verify that actual time didn't progress as much as was requested
+ assertTrue(System.nanoTime() < maxTimeNano);
+ // Verify that the fake time did go up by the amount requested
+ assertEquals(fakeTimeNano + sleepTimeNano, fastNanoClockAndSleeper.nanoTime());
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/f6a0c674/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java
index 6ffcaeb..0af584e 100644
--- a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java
+++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java
@@ -76,7 +76,6 @@ import java.util.concurrent.TimeUnit;
import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.testing.FastNanoClockAndSleeper;
import org.apache.beam.sdk.util.GcsUtil.StorageObjectOrIOException;
import org.apache.beam.sdk.util.gcsfs.GcsPath;
import org.junit.Rule;
@@ -374,7 +373,8 @@ public class GcsUtilTest {
Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class);
Storage.Objects.Get mockStorageGet = Mockito.mock(Storage.Objects.Get.class);
- BackOff mockBackOff = FluentBackoff.DEFAULT.withMaxRetries(2).backoff();
+ BackOff mockBackOff = BackOffAdapter.toGcpBackOff(
+ FluentBackoff.DEFAULT.withMaxRetries(2).backoff());
when(mockStorage.objects()).thenReturn(mockStorageObjects);
when(mockStorageObjects.get("testbucket", "testobject")).thenReturn(mockStorageGet);
@@ -491,7 +491,7 @@ public class GcsUtilTest {
Storage.Buckets.Insert mockStorageInsert = Mockito.mock(Storage.Buckets.Insert.class);
- BackOff mockBackOff = FluentBackoff.DEFAULT.backoff();
+ BackOff mockBackOff = BackOffAdapter.toGcpBackOff(FluentBackoff.DEFAULT.backoff());
when(mockStorage.buckets()).thenReturn(mockStorageObjects);
when(mockStorageObjects.insert(
@@ -514,7 +514,7 @@ public class GcsUtilTest {
Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class);
Storage.Buckets.Insert mockStorageInsert = Mockito.mock(Storage.Buckets.Insert.class);
- BackOff mockBackOff = FluentBackoff.DEFAULT.backoff();
+ BackOff mockBackOff = BackOffAdapter.toGcpBackOff(FluentBackoff.DEFAULT.backoff());
GoogleJsonResponseException expectedException =
googleJsonResponseException(HttpStatusCodes.STATUS_CODE_FORBIDDEN,
"Waves hand mysteriously", "These aren't the buckets you're looking for");
@@ -541,7 +541,8 @@ public class GcsUtilTest {
Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class);
Storage.Buckets.Get mockStorageGet = Mockito.mock(Storage.Buckets.Get.class);
- BackOff mockBackOff = FluentBackoff.DEFAULT.backoff();
+ BackOff mockBackOff = BackOffAdapter.toGcpBackOff(
+ FluentBackoff.DEFAULT.backoff());
when(mockStorage.buckets()).thenReturn(mockStorageObjects);
when(mockStorageObjects.get("testbucket")).thenReturn(mockStorageGet);
@@ -564,7 +565,8 @@ public class GcsUtilTest {
Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class);
Storage.Buckets.Get mockStorageGet = Mockito.mock(Storage.Buckets.Get.class);
- BackOff mockBackOff = FluentBackoff.DEFAULT.backoff();
+ BackOff mockBackOff = BackOffAdapter.toGcpBackOff(
+ FluentBackoff.DEFAULT.backoff());
GoogleJsonResponseException expectedException =
googleJsonResponseException(HttpStatusCodes.STATUS_CODE_FORBIDDEN,
"Waves hand mysteriously", "These aren't the buckets you're looking for");
@@ -589,7 +591,8 @@ public class GcsUtilTest {
Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class);
Storage.Buckets.Get mockStorageGet = Mockito.mock(Storage.Buckets.Get.class);
- BackOff mockBackOff = FluentBackoff.DEFAULT.backoff();
+ BackOff mockBackOff = BackOffAdapter.toGcpBackOff(
+ FluentBackoff.DEFAULT.backoff());
when(mockStorage.buckets()).thenReturn(mockStorageObjects);
when(mockStorageObjects.get("testbucket")).thenReturn(mockStorageGet);
@@ -612,7 +615,8 @@ public class GcsUtilTest {
Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class);
Storage.Buckets.Get mockStorageGet = Mockito.mock(Storage.Buckets.Get.class);
- BackOff mockBackOff = FluentBackoff.DEFAULT.backoff();
+ BackOff mockBackOff = BackOffAdapter.toGcpBackOff(
+ FluentBackoff.DEFAULT.backoff());
when(mockStorage.buckets()).thenReturn(mockStorageObjects);
when(mockStorageObjects.get("testbucket")).thenReturn(mockStorageGet);
@@ -635,7 +639,8 @@ public class GcsUtilTest {
Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class);
Storage.Buckets.Get mockStorageGet = Mockito.mock(Storage.Buckets.Get.class);
- BackOff mockBackOff = FluentBackoff.DEFAULT.backoff();
+ BackOff mockBackOff = BackOffAdapter.toGcpBackOff(
+ FluentBackoff.DEFAULT.backoff());
when(mockStorage.buckets()).thenReturn(mockStorageObjects);
when(mockStorageObjects.get("testbucket")).thenReturn(mockStorageGet);
http://git-wip-us.apache.org/repos/asf/beam/blob/f6a0c674/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/UploadIdResponseInterceptorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/UploadIdResponseInterceptorTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/UploadIdResponseInterceptorTest.java
new file mode 100644
index 0000000..8b9f77e
--- /dev/null
+++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/UploadIdResponseInterceptorTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import com.google.api.client.http.GenericUrl;
+import com.google.api.client.http.HttpResponse;
+import com.google.api.client.testing.http.HttpTesting;
+import com.google.api.client.testing.http.MockHttpTransport;
+import com.google.api.client.testing.http.MockLowLevelHttpResponse;
+import java.io.IOException;
+import org.apache.beam.sdk.testing.ExpectedLogs;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * A test for {@link UploadIdResponseInterceptor}.
+ */
+
+@RunWith(JUnit4.class)
+public class UploadIdResponseInterceptorTest {
+
+ @Rule public ExpectedException expectedException = ExpectedException.none();
+ // Note that expected logs also turns on debug logging.
+ @Rule public ExpectedLogs expectedLogs = ExpectedLogs.none(UploadIdResponseInterceptor.class);
+
+ /**
+ * Builds a HttpResponse with the given string response.
+ *
+ * @param header header value to provide or null if none.
+ * @param uploadId upload id to provide in the url upload id param or null if none.
+ * @param uploadType upload type to provide in url upload type param or null if none.
+ * @return HttpResponse with the given parameters
+ * @throws IOException
+ */
+ private HttpResponse buildHttpResponse(String header, String uploadId, String uploadType)
+ throws IOException {
+ MockHttpTransport.Builder builder = new MockHttpTransport.Builder();
+ MockLowLevelHttpResponse resp = new MockLowLevelHttpResponse();
+ builder.setLowLevelHttpResponse(resp);
+ resp.setStatusCode(200);
+ GenericUrl url = new GenericUrl(HttpTesting.SIMPLE_URL);
+ if (header != null) {
+ resp.addHeader("X-GUploader-UploadID", header);
+ }
+ if (uploadId != null) {
+ url.put("upload_id", uploadId);
+ }
+ if (uploadType != null) {
+ url.put("uploadType", uploadType);
+ }
+ return builder.build().createRequestFactory().buildGetRequest(url).execute();
+ }
+
+ /**
+ * Tests the responses that should not log.
+ */
+ @Test
+ public void testResponseNoLogging() throws IOException {
+ new UploadIdResponseInterceptor().interceptResponse(buildHttpResponse(null, null, null));
+ new UploadIdResponseInterceptor().interceptResponse(buildHttpResponse("hh", "a", null));
+ new UploadIdResponseInterceptor().interceptResponse(buildHttpResponse(null, "h", null));
+ new UploadIdResponseInterceptor().interceptResponse(buildHttpResponse("hh", null, null));
+ new UploadIdResponseInterceptor().interceptResponse(buildHttpResponse(null, null, "type"));
+ new UploadIdResponseInterceptor().interceptResponse(buildHttpResponse("hh", "a", "type"));
+ new UploadIdResponseInterceptor().interceptResponse(buildHttpResponse(null, "h", "type"));
+ expectedLogs.verifyNotLogged("");
+ }
+
+ /**
+ * Check that a response logs with the correct log.
+ */
+ @Test
+ public void testResponseLogs() throws IOException {
+ new UploadIdResponseInterceptor().interceptResponse(buildHttpResponse("abc", null, "type"));
+ GenericUrl url = new GenericUrl(HttpTesting.SIMPLE_URL);
+ url.put("uploadType", "type");
+ String worker = System.getProperty("worker_id");
+ expectedLogs.verifyDebug("Upload ID for url " + url + " on worker " + worker + " is abc");
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/f6a0c674/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index b348abd..5d5a519 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -65,6 +65,7 @@ import org.apache.beam.sdk.extensions.gcp.auth.NullCredentialInitializer;
import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.util.BackOffAdapter;
import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.sdk.util.RetryHttpRequestInitializer;
import org.apache.beam.sdk.util.Transport;
@@ -93,6 +94,9 @@ class BigQueryServicesImpl implements BigQueryServices {
// The initial backoff for polling the status of a BigQuery job.
private static final Duration INITIAL_JOB_STATUS_POLL_BACKOFF = Duration.standardSeconds(1);
+ private static final FluentBackoff DEFAULT_BACKOFF_FACTORY =
+ FluentBackoff.DEFAULT.withMaxRetries(MAX_RPC_RETRIES).withInitialBackoff(INITIAL_RPC_BACKOFF);
+
@Override
public JobService getJobService(BigQueryOptions options) {
return new JobServiceImpl(options);
@@ -114,6 +118,10 @@ class BigQueryServicesImpl implements BigQueryServices {
return BigQueryJsonReaderImpl.fromQuery(bqOptions, projectId, queryConfig);
}
+ private static BackOff createDefaultBackoff() {
+ return BackOffAdapter.toGcpBackOff(DEFAULT_BACKOFF_FACTORY.backoff());
+ }
+
@VisibleForTesting
static class JobServiceImpl implements BigQueryServices.JobService {
private final ApiErrorExtractor errorExtractor;
@@ -205,10 +213,7 @@ class BigQueryServicesImpl implements BigQueryServices {
private static void startJob(Job job,
ApiErrorExtractor errorExtractor,
Bigquery client) throws IOException, InterruptedException {
- BackOff backoff =
- FluentBackoff.DEFAULT
- .withMaxRetries(MAX_RPC_RETRIES).withInitialBackoff(INITIAL_RPC_BACKOFF).backoff();
- startJob(job, errorExtractor, client, Sleeper.DEFAULT, backoff);
+ startJob(job, errorExtractor, client, Sleeper.DEFAULT, createDefaultBackoff());
}
@VisibleForTesting
@@ -249,11 +254,12 @@ class BigQueryServicesImpl implements BigQueryServices {
@Override
public Job pollJob(JobReference jobRef, int maxAttempts) throws InterruptedException {
BackOff backoff =
- FluentBackoff.DEFAULT
- .withMaxRetries(maxAttempts)
- .withInitialBackoff(INITIAL_JOB_STATUS_POLL_BACKOFF)
- .withMaxBackoff(Duration.standardMinutes(1))
- .backoff();
+ BackOffAdapter.toGcpBackOff(
+ FluentBackoff.DEFAULT
+ .withMaxRetries(maxAttempts)
+ .withInitialBackoff(INITIAL_JOB_STATUS_POLL_BACKOFF)
+ .withMaxBackoff(Duration.standardMinutes(1))
+ .backoff());
return pollJob(jobRef, Sleeper.DEFAULT, backoff);
}
@@ -299,16 +305,13 @@ class BigQueryServicesImpl implements BigQueryServices {
.setConfiguration(new JobConfiguration()
.setQuery(queryConfig)
.setDryRun(true));
- BackOff backoff =
- FluentBackoff.DEFAULT
- .withMaxRetries(MAX_RPC_RETRIES).withInitialBackoff(INITIAL_RPC_BACKOFF).backoff();
return executeWithRetries(
client.jobs().insert(projectId, job),
String.format(
"Unable to dry run query: %s, aborting after %d retries.",
queryConfig, MAX_RPC_RETRIES),
Sleeper.DEFAULT,
- backoff,
+ createDefaultBackoff(),
ALWAYS_RETRY).getStatistics();
}
@@ -321,10 +324,7 @@ class BigQueryServicesImpl implements BigQueryServices {
*/
@Override
public Job getJob(JobReference jobRef) throws IOException, InterruptedException {
- BackOff backoff =
- FluentBackoff.DEFAULT
- .withMaxRetries(MAX_RPC_RETRIES).withInitialBackoff(INITIAL_RPC_BACKOFF).backoff();
- return getJob(jobRef, Sleeper.DEFAULT, backoff);
+ return getJob(jobRef, Sleeper.DEFAULT, createDefaultBackoff());
}
@VisibleForTesting
@@ -371,7 +371,7 @@ class BigQueryServicesImpl implements BigQueryServices {
FluentBackoff.DEFAULT.withInitialBackoff(Duration.millis(200)).withMaxRetries(5);
// A backoff for rate limit exceeded errors. Retries forever.
- private static final FluentBackoff DEFAULT_BACKOFF_FACTORY =
+ private static final FluentBackoff RATE_LIMIT_BACKOFF_FACTORY =
FluentBackoff.DEFAULT
.withInitialBackoff(Duration.standardSeconds(1))
.withMaxBackoff(Duration.standardMinutes(2));
@@ -420,10 +420,7 @@ class BigQueryServicesImpl implements BigQueryServices {
@Nullable
public Table getTable(TableReference tableRef)
throws IOException, InterruptedException {
- BackOff backoff =
- FluentBackoff.DEFAULT
- .withMaxRetries(MAX_RPC_RETRIES).withInitialBackoff(INITIAL_RPC_BACKOFF).backoff();
- return getTable(tableRef, backoff, Sleeper.DEFAULT);
+ return getTable(tableRef, createDefaultBackoff(), Sleeper.DEFAULT);
}
@VisibleForTesting
@@ -528,9 +525,6 @@ class BigQueryServicesImpl implements BigQueryServices {
*/
@Override
public void deleteTable(TableReference tableRef) throws IOException, InterruptedException {
- BackOff backoff =
- FluentBackoff.DEFAULT
- .withMaxRetries(MAX_RPC_RETRIES).withInitialBackoff(INITIAL_RPC_BACKOFF).backoff();
executeWithRetries(
client.tables().delete(
tableRef.getProjectId(), tableRef.getDatasetId(), tableRef.getTableId()),
@@ -538,16 +532,13 @@ class BigQueryServicesImpl implements BigQueryServices {
"Unable to delete table: %s, aborting after %d retries.",
tableRef.getTableId(), MAX_RPC_RETRIES),
Sleeper.DEFAULT,
- backoff,
+ createDefaultBackoff(),
ALWAYS_RETRY);
}
@Override
public boolean isTableEmpty(TableReference tableRef) throws IOException, InterruptedException {
- BackOff backoff =
- FluentBackoff.DEFAULT
- .withMaxRetries(MAX_RPC_RETRIES).withInitialBackoff(INITIAL_RPC_BACKOFF).backoff();
- return isTableEmpty(tableRef, backoff, Sleeper.DEFAULT);
+ return isTableEmpty(tableRef, createDefaultBackoff(), Sleeper.DEFAULT);
}
@VisibleForTesting
@@ -575,16 +566,13 @@ class BigQueryServicesImpl implements BigQueryServices {
@Override
public Dataset getDataset(String projectId, String datasetId)
throws IOException, InterruptedException {
- BackOff backoff =
- FluentBackoff.DEFAULT
- .withMaxRetries(MAX_RPC_RETRIES).withInitialBackoff(INITIAL_RPC_BACKOFF).backoff();
return executeWithRetries(
client.datasets().get(projectId, datasetId),
String.format(
"Unable to get dataset: %s, aborting after %d retries.",
datasetId, MAX_RPC_RETRIES),
Sleeper.DEFAULT,
- backoff,
+ createDefaultBackoff(),
DONT_RETRY_NOT_FOUND);
}
@@ -599,10 +587,8 @@ class BigQueryServicesImpl implements BigQueryServices {
public void createDataset(
String projectId, String datasetId, @Nullable String location, @Nullable String description)
throws IOException, InterruptedException {
- BackOff backoff =
- FluentBackoff.DEFAULT
- .withMaxRetries(MAX_RPC_RETRIES).withInitialBackoff(INITIAL_RPC_BACKOFF).backoff();
- createDataset(projectId, datasetId, location, description, Sleeper.DEFAULT, backoff);
+ createDataset(
+ projectId, datasetId, location, description, Sleeper.DEFAULT, createDefaultBackoff());
}
private void createDataset(
@@ -659,16 +645,13 @@ class BigQueryServicesImpl implements BigQueryServices {
@Override
public void deleteDataset(String projectId, String datasetId)
throws IOException, InterruptedException {
- BackOff backoff =
- FluentBackoff.DEFAULT
- .withMaxRetries(MAX_RPC_RETRIES).withInitialBackoff(INITIAL_RPC_BACKOFF).backoff();
executeWithRetries(
client.datasets().delete(projectId, datasetId),
String.format(
"Unable to delete table: %s, aborting after %d retries.",
datasetId, MAX_RPC_RETRIES),
Sleeper.DEFAULT,
- backoff,
+ createDefaultBackoff(),
ALWAYS_RETRY);
}
@@ -725,7 +708,9 @@ class BigQueryServicesImpl implements BigQueryServices {
executor.submit(new Callable<List<TableDataInsertAllResponse.InsertErrors>>() {
@Override
public List<TableDataInsertAllResponse.InsertErrors> call() throws IOException {
- BackOff backoff = DEFAULT_BACKOFF_FACTORY.backoff();
+ // A backoff for rate limit exceeded errors. Retries forever.
+ BackOff backoff = BackOffAdapter.toGcpBackOff(
+ RATE_LIMIT_BACKOFF_FACTORY.backoff());
while (true) {
try {
return insert.execute().getInsertErrors();
@@ -811,7 +796,10 @@ class BigQueryServicesImpl implements BigQueryServices {
TableReference ref, List<TableRow> rowList, @Nullable List<String> insertIdList)
throws IOException, InterruptedException {
return insertAll(
- ref, rowList, insertIdList, INSERT_BACKOFF_FACTORY.backoff(), Sleeper.DEFAULT);
+ ref, rowList, insertIdList,
+ BackOffAdapter.toGcpBackOff(
+ INSERT_BACKOFF_FACTORY.backoff()),
+ Sleeper.DEFAULT);
}
@@ -822,9 +810,6 @@ class BigQueryServicesImpl implements BigQueryServices {
Table table = new Table();
table.setDescription(tableDescription);
- BackOff backoff =
- FluentBackoff.DEFAULT
- .withMaxRetries(MAX_RPC_RETRIES).withInitialBackoff(INITIAL_RPC_BACKOFF).backoff();
return executeWithRetries(
client.tables().patch(
tableReference.getProjectId(),
@@ -835,7 +820,7 @@ class BigQueryServicesImpl implements BigQueryServices {
"Unable to patch table description: %s, aborting after %d retries.",
tableReference, MAX_RPC_RETRIES),
Sleeper.DEFAULT,
- backoff,
+ createDefaultBackoff(),
ALWAYS_RETRY);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f6a0c674/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java
index 59f2bb6..ba19cf0 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java
@@ -57,6 +57,8 @@ import java.util.Objects;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
+
+import org.apache.beam.sdk.util.BackOffAdapter;
import org.apache.beam.sdk.util.FluentBackoff;
import org.joda.time.Duration;
import org.slf4j.Logger;
@@ -453,8 +455,9 @@ class BigQueryTableRowIterator implements AutoCloseable {
throws IOException, InterruptedException {
Sleeper sleeper = Sleeper.DEFAULT;
BackOff backOff =
- FluentBackoff.DEFAULT
- .withMaxRetries(MAX_RETRIES).withInitialBackoff(INITIAL_BACKOFF_TIME).backoff();
+ BackOffAdapter.toGcpBackOff(
+ FluentBackoff.DEFAULT
+ .withMaxRetries(MAX_RETRIES).withInitialBackoff(INITIAL_BACKOFF_TIME).backoff());
T result = null;
while (true) {
[4/4] beam git commit: [BEAM-1871] Remove google api BackOff usage
from sdks/core
Posted by lc...@apache.org.
[BEAM-1871] Remove google api BackOff usage from sdks/core
This closes #2982
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5763c384
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5763c384
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5763c384
Branch: refs/heads/release-2.0.0
Commit: 5763c384d122d4c9c09d93f780dc5f65f09e9a27
Parents: e282601 8ea53a4
Author: Luke Cwik <lc...@google.com>
Authored: Tue May 9 09:55:16 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Tue May 9 09:55:16 2017 -0700
----------------------------------------------------------------------
.../beam/examples/common/ExampleUtils.java | 6 +-
.../beam/examples/WindowedWordCountIT.java | 2 +-
.../runners/dataflow/DataflowPipelineJob.java | 18 +++-
.../beam/runners/dataflow/util/PackageUtil.java | 3 +-
.../dataflow/DataflowPipelineJobTest.java | 13 ++-
.../runners/dataflow/util/PackageUtilTest.java | 2 +-
runners/spark/pom.xml | 5 -
.../beam/runners/spark/io/MicrobatchSource.java | 2 +-
sdks/java/core/pom.xml | 5 -
.../beam/sdk/coders/StructuralByteArray.java | 4 +-
.../sdk/io/BoundedReadFromUnboundedSource.java | 2 +-
.../beam/sdk/testing/FileChecksumMatcher.java | 2 +-
.../beam/sdk/testing/MatcherDeserializer.java | 4 +-
.../beam/sdk/testing/MatcherSerializer.java | 4 +-
.../java/org/apache/beam/sdk/util/BackOff.java | 81 ++++++++++++++++
.../org/apache/beam/sdk/util/BackOffUtils.java | 57 ++++++++++++
.../org/apache/beam/sdk/util/CoderUtils.java | 10 +-
.../beam/sdk/util/ExplicitShardedFile.java | 3 -
.../org/apache/beam/sdk/util/FluentBackoff.java | 1 -
.../beam/sdk/util/NumberedShardedFile.java | 3 -
.../org/apache/beam/sdk/util/ShardedFile.java | 2 -
.../java/org/apache/beam/sdk/util/Sleeper.java | 48 ++++++++++
.../sdk/util/UploadIdResponseInterceptor.java | 60 ------------
.../org/apache/beam/SdkCoreApiSurfaceTest.java | 1 -
.../beam/sdk/testing/ExpectedLogsTest.java | 2 +-
.../sdk/testing/FastNanoClockAndSleeper.java | 47 ----------
.../testing/FastNanoClockAndSleeperTest.java | 47 ----------
.../sdk/testing/FileChecksumMatcherTest.java | 5 -
.../beam/sdk/testing/SystemNanoTimeSleeper.java | 2 +-
.../apache/beam/sdk/util/FluentBackoffTest.java | 1 -
.../beam/sdk/util/NumberedShardedFileTest.java | 14 ++-
.../util/UploadIdResponseInterceptorTest.java | 98 --------------------
.../sdk/extensions/gcp/options/GcpOptions.java | 3 +-
.../apache/beam/sdk/util/BackOffAdapter.java | 43 +++++++++
.../java/org/apache/beam/sdk/util/GcsUtil.java | 13 ++-
.../sdk/util/UploadIdResponseInterceptor.java | 60 ++++++++++++
.../beam/sdk/util/FastNanoClockAndSleeper.java | 47 ++++++++++
.../sdk/util/FastNanoClockAndSleeperTest.java | 47 ++++++++++
.../org/apache/beam/sdk/util/GcsUtilTest.java | 23 +++--
.../util/UploadIdResponseInterceptorTest.java | 98 ++++++++++++++++++++
.../io/gcp/bigquery/BigQueryServicesImpl.java | 81 +++++++---------
.../gcp/bigquery/BigQueryTableRowIterator.java | 7 +-
.../beam/sdk/io/gcp/datastore/DatastoreV1.java | 6 +-
.../gcp/bigquery/BigQueryServicesImplTest.java | 24 +++--
.../sdk/io/gcp/bigquery/FakeJobService.java | 12 ++-
.../beam/sdk/io/gcp/datastore/V1TestUtil.java | 6 +-
.../sdk/io/gcp/testing/BigqueryMatcher.java | 4 +-
.../sdk/io/gcp/testing/BigqueryMatcherTest.java | 7 +-
48 files changed, 633 insertions(+), 402 deletions(-)
----------------------------------------------------------------------