You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/10/03 18:47:05 UTC
[1/2] beam git commit: [BEAM-2958] Adding user agent string to
PipelineOptions.
Repository: beam
Updated Branches:
refs/heads/master 66d7b6f84 -> ba5bee668
[BEAM-2958] Adding user agent string to PipelineOptions.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a83cae5b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a83cae5b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a83cae5b
Branch: refs/heads/master
Commit: a83cae5b3e9535134507b0306355300a9a19dfa1
Parents: 66d7b6f
Author: Daniel Oliveira <da...@gmail.com>
Authored: Wed Sep 27 12:24:17 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Tue Oct 3 11:46:31 2017 -0700
----------------------------------------------------------------------
.../beam/runners/dataflow/DataflowRunner.java | 6 ++++
.../runners/dataflow/DataflowRunnerTest.java | 12 +++++++
.../beam/sdk/options/PipelineOptions.java | 36 ++++++++++++++++++++
.../beam/sdk/options/PipelineOptionsTest.java | 11 ++++++
.../beam/sdk/io/gcp/bigtable/BigtableIO.java | 26 +++-----------
5 files changed, 70 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/a83cae5b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index d5a9845..4cd3db0 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -297,6 +297,12 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
dataflowOptions.setGcsUploadBufferSizeBytes(GCS_UPLOAD_BUFFER_SIZE_BYTES_DEFAULT);
}
+ DataflowRunnerInfo dataflowRunnerInfo = DataflowRunnerInfo.getDataflowRunnerInfo();
+ String userAgent = String
+ .format("%s/%s", dataflowRunnerInfo.getName(), dataflowRunnerInfo.getVersion())
+ .replace(" ", "_");
+ dataflowOptions.setUserAgent(userAgent);
+
return new DataflowRunner(dataflowOptions);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/a83cae5b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index f1e3805..0e3c266 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -341,6 +341,18 @@ public class DataflowRunnerTest implements Serializable {
}
@Test
+ public void testFromOptionsUserAgentFromPipelineInfo() throws Exception {
+ DataflowPipelineOptions options = buildPipelineOptions();
+ DataflowRunner.fromOptions(options);
+
+ String expectedName = DataflowRunnerInfo.getDataflowRunnerInfo().getName().replace(" ", "_");
+ assertThat(options.getUserAgent(), containsString(expectedName));
+
+ String expectedVersion = DataflowRunnerInfo.getDataflowRunnerInfo().getVersion();
+ assertThat(options.getUserAgent(), containsString(expectedVersion));
+ }
+
+ @Test
public void testRun() throws IOException {
DataflowPipelineOptions options = buildPipelineOptions();
Pipeline p = buildDataflowPipeline(options);
http://git-wip-us.apache.org/repos/asf/beam/blob/a83cae5b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
index 5cc0b3f..77117b6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
@@ -35,6 +35,7 @@ import org.apache.beam.sdk.options.ProxyInvocationHandler.Deserializer;
import org.apache.beam.sdk.options.ProxyInvocationHandler.Serializer;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.display.HasDisplayData;
+import org.apache.beam.sdk.util.ReleaseInfo;
import org.joda.time.DateTimeUtils;
import org.joda.time.DateTimeZone;
import org.joda.time.format.DateTimeFormat;
@@ -353,4 +354,39 @@ public interface PipelineOptions extends HasDisplayData {
return NEXT_ID.getAndIncrement();
}
}
+
+ /**
+ * A user agent string as per RFC2616, describing the pipeline to external services.
+ *
+ * <p>https://www.ietf.org/rfc/rfc2616.txt
+ *
+ * <p>It should follow the BNF Form:
+ * <pre><code>
+ * user agent = 1*(product | comment)
+ * product = token ["/" product-version]
+ * product-version = token
+ * </code></pre>
+ * Where a token is a series of characters without a separator.
+ *
+ * <p>The string defaults to {@code [name]/[version]} based on the properties of the Apache Beam
+ * release.
+ */
+ @Description("A user agent string describing the pipeline to external services."
+ + " The format should follow RFC2616. This option defaults to \"[name]/[version]\""
+ + " where name and version are properties of the Apache Beam release.")
+ @Default.InstanceFactory(UserAgentFactory.class)
+ String getUserAgent();
+ void setUserAgent(String userAgent);
+
+ /**
+ * Returns a user agent string constructed from {@link ReleaseInfo#getName()} and
+ * {@link ReleaseInfo#getVersion()}, in the format {@code [name]/[version]}.
+ */
+ class UserAgentFactory implements DefaultValueFactory<String> {
+ @Override
+ public String create(PipelineOptions options) {
+ ReleaseInfo info = ReleaseInfo.getReleaseInfo();
+ return String.format("%s/%s", info.getName(), info.getVersion()).replace(" ", "_");
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/a83cae5b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsTest.java
index 5e3211f..7f80c0c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsTest.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.options;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import com.fasterxml.jackson.annotation.JsonIgnore;
@@ -36,6 +37,8 @@ import org.junit.runners.JUnit4;
/** Unit tests for {@link PipelineOptions}. */
@RunWith(JUnit4.class)
public class PipelineOptionsTest {
+ private static final String DEFAULT_USER_AGENT_NAME = "Apache_Beam_SDK_for_Java";
+
@Rule public ExpectedException expectedException = ExpectedException.none();
/** Interfaces used for testing that {@link PipelineOptions#as(Class)} functions. */
@@ -106,4 +109,12 @@ public class PipelineOptionsTest {
}
}
}
+
+ @Test
+ public void testUserAgentFactory() {
+ PipelineOptions options = PipelineOptionsFactory.create();
+ String userAgent = options.getUserAgent();
+ assertNotNull(userAgent);
+ assertTrue(userAgent.contains(DEFAULT_USER_AGENT_NAME));
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/a83cae5b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
index 252f6c5..47efa08 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
@@ -60,7 +60,6 @@ import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.util.ReleaseInfo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
@@ -289,10 +288,9 @@ public class BigtableIO {
BigtableOptions.Builder clonedBuilder = options.toBuilder()
.setUseCachedDataPool(true);
- BigtableOptions optionsWithAgent =
- clonedBuilder.setUserAgent(getBeamSdkPartOfUserAgent()).build();
+ BigtableOptions clonedOptions = clonedBuilder.build();
- return toBuilder().setBigtableOptions(optionsWithAgent).build();
+ return toBuilder().setBigtableOptions(clonedOptions).build();
}
/**
@@ -498,9 +496,8 @@ public class BigtableIO {
.setUseBulkApi(true)
.build())
.setUseCachedDataPool(true);
- BigtableOptions optionsWithAgent =
- clonedBuilder.setUserAgent(getBeamSdkPartOfUserAgent()).build();
- return toBuilder().setBigtableOptions(optionsWithAgent).build();
+ BigtableOptions clonedOptions = clonedBuilder.build();
+ return toBuilder().setBigtableOptions(clonedOptions).build();
}
/** Disables validation that the table being written to exists. */
@@ -595,6 +592,7 @@ public class BigtableIO {
return getBigtableService();
}
BigtableOptions.Builder clonedOptions = getBigtableOptions().toBuilder();
+ clonedOptions.setUserAgent(pipelineOptions.getUserAgent());
if (getBigtableOptions().getCredentialOptions()
.getCredentialType() == CredentialType.DefaultCredentials) {
clonedOptions.setCredentialOptions(
@@ -1100,18 +1098,4 @@ public class BigtableIO {
cause);
}
}
-
- /**
- * A helper function to produce a Cloud Bigtable user agent string. This need only include
- * information about the Apache Beam SDK itself, because Bigtable will automatically append
- * other relevant system and Bigtable client-specific version information.
- *
- * @see com.google.cloud.bigtable.config.BigtableVersionInfo
- */
- private static String getBeamSdkPartOfUserAgent() {
- ReleaseInfo info = ReleaseInfo.getReleaseInfo();
- return
- String.format("%s/%s", info.getName(), info.getVersion())
- .replace(" ", "_");
- }
}
[2/2] beam git commit: [BEAM-2958] Adding a top-level user agent
string to PipelineOptions
Posted by lc...@apache.org.
[BEAM-2958] Adding a top-level user agent string to PipelineOptions
This closes #3915
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ba5bee66
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ba5bee66
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ba5bee66
Branch: refs/heads/master
Commit: ba5bee668b9a5d8643a051f3b0742de500862b1f
Parents: 66d7b6f a83cae5
Author: Luke Cwik <lc...@google.com>
Authored: Tue Oct 3 11:46:56 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Tue Oct 3 11:46:56 2017 -0700
----------------------------------------------------------------------
.../beam/runners/dataflow/DataflowRunner.java | 6 ++++
.../runners/dataflow/DataflowRunnerTest.java | 12 +++++++
.../beam/sdk/options/PipelineOptions.java | 36 ++++++++++++++++++++
.../beam/sdk/options/PipelineOptionsTest.java | 11 ++++++
.../beam/sdk/io/gcp/bigtable/BigtableIO.java | 26 +++-----------
5 files changed, 70 insertions(+), 21 deletions(-)
----------------------------------------------------------------------