You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/08/29 23:02:59 UTC
[1/2] beam git commit: Removes unnecessary calls to
ValueProvider.isAccessible
Repository: beam
Updated Branches:
refs/heads/master 6280d497b -> c1a757476
Removes unnecessary calls to ValueProvider.isAccessible
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/97810b4b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/97810b4b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/97810b4b
Branch: refs/heads/master
Commit: 97810b4b23037fe333af103661bbb15acec96a57
Parents: 6280d49
Author: Eugene Kirpichov <ki...@google.com>
Authored: Thu Aug 17 19:44:17 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue Aug 29 15:42:10 2017 -0700
----------------------------------------------------------------------
.../java/org/apache/beam/sdk/io/AvroIO.java | 10 +----
.../beam/sdk/io/DefaultFilenamePolicy.java | 25 +++++------
.../org/apache/beam/sdk/io/FileBasedSink.java | 4 +-
.../org/apache/beam/sdk/io/FileBasedSource.java | 25 +++--------
.../java/org/apache/beam/sdk/io/TFRecordIO.java | 15 +------
.../java/org/apache/beam/sdk/io/TextIO.java | 14 +-----
.../java/org/apache/beam/sdk/io/WriteFiles.java | 6 +--
.../apache/beam/sdk/options/ValueProvider.java | 18 +++++---
.../sdk/transforms/display/DisplayData.java | 8 ++--
.../org/apache/beam/sdk/io/WriteFilesTest.java | 2 +-
.../beam/sdk/options/ValueProviderTest.java | 15 +++----
.../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 47 +++++++++++---------
.../io/gcp/bigquery/BigQueryTableSource.java | 2 -
.../apache/beam/sdk/io/gcp/pubsub/PubsubIO.java | 35 +++++++--------
.../sdk/io/gcp/pubsub/PubsubUnboundedSink.java | 6 +--
.../io/gcp/pubsub/PubsubUnboundedSource.java | 23 +++-------
16 files changed, 95 insertions(+), 160 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/97810b4b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index 910d8e2..9e0422e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -989,19 +989,11 @@ public class AvroIO {
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
resolveDynamicDestinations().populateDisplayData(builder);
-
- String tempDirectory = null;
- if (getTempDirectory() != null) {
- tempDirectory =
- getTempDirectory().isAccessible()
- ? getTempDirectory().get().toString()
- : getTempDirectory().toString();
- }
builder
.addIfNotDefault(
DisplayData.item("numShards", getNumShards()).withLabel("Maximum Output Shards"), 0)
.addIfNotNull(
- DisplayData.item("tempDirectory", tempDirectory)
+ DisplayData.item("tempDirectory", getTempDirectory())
.withLabel("Directory for temporary files"));
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/97810b4b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java
index 1f438d5..2f22e82 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java
@@ -368,26 +368,21 @@ public final class DefaultFilenamePolicy extends FilenamePolicy {
@Override
public void populateDisplayData(DisplayData.Builder builder) {
- String filenamePattern;
- if (params.baseFilename.isAccessible()) {
- filenamePattern =
- String.format("%s%s%s", params.baseFilename.get(), params.shardTemplate, params.suffix);
- } else {
- filenamePattern =
- String.format("%s%s%s", params.baseFilename, params.shardTemplate, params.suffix);
- }
-
- String outputPrefixString = null;
- outputPrefixString =
+ String displayBaseFilename =
params.baseFilename.isAccessible()
? params.baseFilename.get().toString()
- : params.baseFilename.toString();
- builder.add(DisplayData.item("filenamePattern", filenamePattern).withLabel("Filename Pattern"));
- builder.add(DisplayData.item("filePrefix", outputPrefixString).withLabel("Output File Prefix"));
- builder.add(DisplayData.item("fileSuffix", params.suffix).withLabel("Output file Suffix"));
+ : ("(" + params.baseFilename + ")");
+ builder.add(
+ DisplayData.item(
+ "filenamePattern",
+ String.format("%s%s%s", displayBaseFilename, params.shardTemplate, params.suffix))
+ .withLabel("Filename pattern"));
+ builder.add(
+ DisplayData.item("filePrefix", params.baseFilename).withLabel("Output File Prefix"));
builder.add(
DisplayData.item("shardNameTemplate", params.shardTemplate)
.withLabel("Output Shard Name Template"));
+ builder.add(DisplayData.item("fileSuffix", params.suffix).withLabel("Output file Suffix"));
}
private static String extractFilename(ResourceId input) {
http://git-wip-us.apache.org/repos/asf/beam/blob/97810b4b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index 4e2b61c..d618647 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -747,12 +747,10 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
@Override
public String toString() {
- String tempDirectoryStr =
- tempDirectory.isAccessible() ? tempDirectory.get().toString() : tempDirectory.toString();
return getClass().getSimpleName()
+ "{"
+ "tempDirectory="
- + tempDirectoryStr
+ + tempDirectory
+ ", windowedWrites="
+ windowedWrites
+ '}';
http://git-wip-us.apache.org/repos/asf/beam/blob/97810b4b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
index 7f865de..f835fa4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
@@ -211,10 +211,6 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
// This implementation of method getEstimatedSizeBytes is provided to simplify subclasses. Here
// we perform the size estimation of files and file patterns using the interface provided by
// FileSystem.
- checkState(
- fileOrPatternSpec.isAccessible(),
- "Cannot estimate size of a FileBasedSource with inaccessible file pattern: {}.",
- fileOrPatternSpec);
String fileOrPattern = fileOrPatternSpec.get();
if (mode == Mode.FILEPATTERN) {
@@ -240,10 +236,9 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
if (mode == Mode.FILEPATTERN) {
- String patternDisplay = getFileOrPatternSpecProvider().isAccessible()
- ? getFileOrPatternSpecProvider().get()
- : getFileOrPatternSpecProvider().toString();
- builder.add(DisplayData.item("filePattern", patternDisplay).withLabel("File Pattern"));
+ builder.add(
+ DisplayData.item("filePattern", getFileOrPatternSpecProvider())
+ .withLabel("File Pattern"));
}
}
@@ -254,10 +249,6 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
// split a FileBasedSource based on a file pattern to FileBasedSources based on full single
// files. For files that can be efficiently seeked, we further split FileBasedSources based on
// those files to FileBasedSources based on sub ranges of single files.
- checkState(
- fileOrPatternSpec.isAccessible(),
- "Cannot split a FileBasedSource without access to the file or pattern specification: {}.",
- fileOrPatternSpec);
String fileOrPattern = fileOrPatternSpec.get();
if (mode == Mode.FILEPATTERN) {
@@ -326,10 +317,6 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
public final BoundedReader<T> createReader(PipelineOptions options) throws IOException {
// Validate the current source prior to creating a reader for it.
this.validate();
- checkState(
- fileOrPatternSpec.isAccessible(),
- "Cannot create a file reader without access to the file or pattern specification: {}.",
- fileOrPatternSpec);
String fileOrPattern = fileOrPatternSpec.get();
if (mode == Mode.FILEPATTERN) {
@@ -358,13 +345,11 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
@Override
public String toString() {
- String fileString = fileOrPatternSpec.isAccessible()
- ? fileOrPatternSpec.get() : fileOrPatternSpec.toString();
switch (mode) {
case FILEPATTERN:
- return fileString;
+ return fileOrPatternSpec.toString();
case SINGLE_FILE_OR_SUBRANGE:
- return fileString + " range " + super.toString();
+ return fileOrPatternSpec + " range " + super.toString();
default:
throw new IllegalStateException("Unexpected mode: " + mode);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/97810b4b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
index c75051f..526c50e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
@@ -195,15 +195,12 @@ public class TFRecordIO {
@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
-
- String filepatternDisplay = getFilepattern().isAccessible()
- ? getFilepattern().get() : getFilepattern().toString();
builder
.add(DisplayData.item("compressionType", getCompressionType().toString())
.withLabel("Compression Type"))
.addIfNotDefault(DisplayData.item("validation", getValidate())
.withLabel("Validation Enabled"), true)
- .addIfNotNull(DisplayData.item("filePattern", filepatternDisplay)
+ .addIfNotNull(DisplayData.item("filePattern", getFilepattern())
.withLabel("File Pattern"));
}
}
@@ -360,16 +357,8 @@ public class TFRecordIO {
@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
-
- String outputPrefixString = null;
- if (getOutputPrefix().isAccessible()) {
- ResourceId dir = getOutputPrefix().get();
- outputPrefixString = dir.toString();
- } else {
- outputPrefixString = getOutputPrefix().toString();
- }
builder
- .add(DisplayData.item("filePrefix", outputPrefixString)
+ .add(DisplayData.item("filePrefix", getOutputPrefix())
.withLabel("Output File Prefix"))
.addIfNotNull(DisplayData.item("fileSuffix", getFilenameSuffix())
.withLabel("Output File Suffix"))
http://git-wip-us.apache.org/repos/asf/beam/blob/97810b4b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index 612f5c5..cbc17ff 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -399,15 +399,12 @@ public class TextIO {
@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
-
- String filepatternDisplay = getFilepattern().isAccessible()
- ? getFilepattern().get() : getFilepattern().toString();
builder
.add(
DisplayData.item("compressionType", getCompressionType().toString())
.withLabel("Compression Type"))
.addIfNotNull(
- DisplayData.item("filePattern", filepatternDisplay).withLabel("File Pattern"))
+ DisplayData.item("filePattern", getFilepattern()).withLabel("File Pattern"))
.add(
DisplayData.item("emptyMatchTreatment", getEmptyMatchTreatment().toString())
.withLabel("Treatment of filepatterns that match no files"))
@@ -904,18 +901,11 @@ public class TextIO {
super.populateDisplayData(builder);
resolveDynamicDestinations().populateDisplayData(builder);
- String tempDirectory = null;
- if (getTempDirectory() != null) {
- tempDirectory =
- getTempDirectory().isAccessible()
- ? getTempDirectory().get().toString()
- : getTempDirectory().toString();
- }
builder
.addIfNotDefault(
DisplayData.item("numShards", getNumShards()).withLabel("Maximum Output Shards"), 0)
.addIfNotNull(
- DisplayData.item("tempDirectory", tempDirectory)
+ DisplayData.item("tempDirectory", getTempDirectory())
.withLabel("Directory for temporary files"))
.addIfNotNull(DisplayData.item("fileHeader", getHeader()).withLabel("File Header"))
.addIfNotNull(DisplayData.item("fileFooter", getFooter()).withLabel("File Footer"))
http://git-wip-us.apache.org/repos/asf/beam/blob/97810b4b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
index 85c5652..7878c73 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
@@ -205,10 +205,8 @@ public class WriteFiles<UserT, DestinationT, OutputT>
.include("sink", sink);
if (getSharding() != null) {
builder.include("sharding", getSharding());
- } else if (getNumShards() != null) {
- String numShards = getNumShards().isAccessible()
- ? getNumShards().get().toString() : getNumShards().toString();
- builder.add(DisplayData.item("numShards", numShards)
+ } else {
+ builder.addIfNotNull(DisplayData.item("numShards", getNumShards())
.withLabel("Fixed Number of Shards"));
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/97810b4b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProvider.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProvider.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProvider.java
index 94187a9..15413e8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProvider.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProvider.java
@@ -101,9 +101,7 @@ public interface ValueProvider<T> extends Serializable {
@Override
public String toString() {
- return MoreObjects.toStringHelper(this)
- .add("value", value)
- .toString();
+ return String.valueOf(value);
}
}
@@ -160,8 +158,12 @@ public interface ValueProvider<T> extends Serializable {
@Override
public String toString() {
+ if (isAccessible()) {
+ return String.valueOf(get());
+ }
return MoreObjects.toStringHelper(this)
.add("value", value)
+ .add("translator", translator.getClass().getSimpleName())
.toString();
}
}
@@ -226,7 +228,8 @@ public interface ValueProvider<T> extends Serializable {
public T get() {
PipelineOptions options = optionsMap.get(optionsId);
if (options == null) {
- throw new RuntimeException("Not called from a runtime context.");
+ throw new IllegalStateException(
+ "Value only available at runtime, but accessed from a non-runtime context: " + this);
}
try {
Method method = klass.getMethod(methodName);
@@ -249,8 +252,7 @@ public interface ValueProvider<T> extends Serializable {
@Override
public boolean isAccessible() {
- PipelineOptions options = optionsMap.get(optionsId);
- return options != null;
+ return optionsMap.get(optionsId) != null;
}
/**
@@ -262,10 +264,12 @@ public interface ValueProvider<T> extends Serializable {
@Override
public String toString() {
+ if (isAccessible()) {
+ return String.valueOf(get());
+ }
return MoreObjects.toStringHelper(this)
.add("propertyName", propertyName)
.add("default", defaultValue)
- .add("value", isAccessible() ? get() : null)
.toString();
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/97810b4b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java
index 3c4337b..10ef428 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java
@@ -882,12 +882,12 @@ public class DisplayData implements Serializable {
return item(key, Type.STRING, null);
}
Type type = inferType(got);
- if (type == null) {
- throw new RuntimeException(String.format("Unknown value type: %s", got));
+ if (type != null) {
+ return item(key, type, got);
}
- return item(key, type, got);
}
- return item(key, Type.STRING, value.toString());
+ // General case: not null and type not inferable. Fall back to toString of the VP itself.
+ return item(key, Type.STRING, String.valueOf(value));
}
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/97810b4b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java
index 1d4ce08..5e0d685 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java
@@ -518,7 +518,7 @@ public class WriteFilesTest {
DisplayData displayData = DisplayData.from(write);
assertThat(displayData, hasDisplayItem("sink", sink.getClass()));
assertThat(displayData, includesDisplayDataFor("sink", sink));
- assertThat(displayData, hasDisplayItem("numShards", "1"));
+ assertThat(displayData, hasDisplayItem("numShards", 1));
}
@Test
http://git-wip-us.apache.org/repos/asf/beam/blob/97810b4b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ValueProviderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ValueProviderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ValueProviderTest.java
index e596cc1..7bbbf7e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ValueProviderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ValueProviderTest.java
@@ -88,7 +88,7 @@ public class ValueProviderTest {
ValueProvider<String> provider = StaticValueProvider.of("foo");
assertEquals("foo", provider.get());
assertTrue(provider.isAccessible());
- assertEquals("StaticValueProvider{value=foo}", provider.toString());
+ assertEquals("foo", provider.toString());
}
@Test
@@ -97,8 +97,9 @@ public class ValueProviderTest {
ValueProvider<String> provider = options.getFoo();
assertFalse(provider.isAccessible());
- expectedException.expect(RuntimeException.class);
- expectedException.expectMessage("Not called from a runtime context");
+ expectedException.expect(IllegalStateException.class);
+ expectedException.expectMessage("Value only available at runtime");
+ expectedException.expectMessage("foo");
provider.get();
}
@@ -108,7 +109,7 @@ public class ValueProviderTest {
ValueProvider<String> provider = options.getFoo();
assertEquals("foo", ((RuntimeValueProvider) provider).propertyName());
assertEquals(
- "RuntimeValueProvider{propertyName=foo, default=null, value=null}",
+ "RuntimeValueProvider{propertyName=foo, default=null}",
provider.toString());
}
@@ -239,9 +240,7 @@ public class ValueProviderTest {
});
assertTrue(nvp.isAccessible());
assertEquals("foobar", nvp.get());
- assertEquals(
- "NestedValueProvider{value=StaticValueProvider{value=foo}}",
- nvp.toString());
+ assertEquals("foobar", nvp.toString());
}
@Test
@@ -266,7 +265,7 @@ public class ValueProviderTest {
assertEquals("bar", ((NestedValueProvider) doubleNvp).propertyName());
assertFalse(nvp.isAccessible());
expectedException.expect(RuntimeException.class);
- expectedException.expectMessage("Not called from a runtime context");
+ expectedException.expectMessage("Value only available at runtime");
nvp.get();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/97810b4b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 29828e4..1e0ab30 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -451,8 +451,7 @@ public class BigQueryIO {
private BigQuerySourceBase createSource(String jobUuid) {
BigQuerySourceBase source;
- if (getQuery() == null
- || (getQuery().isAccessible() && Strings.isNullOrEmpty(getQuery().get()))) {
+ if (getQuery() == null) {
source = BigQueryTableSource.create(jobUuid, getTableProvider(), getBigQueryServices());
} else {
source =
@@ -517,26 +516,30 @@ public class BigQueryIO {
// Note that a table or query check can fail if the table or dataset are created by
// earlier stages of the pipeline or if a query depends on earlier stages of a pipeline.
// For these cases the withoutValidation method can be used to disable the check.
- if (getValidate() && table != null && table.isAccessible()
- && table.get().getProjectId() != null) {
- checkState(table.isAccessible(), "Cannot call validate if table is dynamically set.");
- // Check for source table presence for early failure notification.
- DatasetService datasetService = getBigQueryServices().getDatasetService(bqOptions);
- BigQueryHelpers.verifyDatasetPresence(datasetService, table.get());
- BigQueryHelpers.verifyTablePresence(datasetService, table.get());
- } else if (getValidate() && getQuery() != null) {
- checkState(getQuery().isAccessible(), "Cannot call validate if query is dynamically set.");
- JobService jobService = getBigQueryServices().getJobService(bqOptions);
- try {
- jobService.dryRunQuery(
- bqOptions.getProject(),
- new JobConfigurationQuery()
- .setQuery(getQuery().get())
- .setFlattenResults(getFlattenResults())
- .setUseLegacySql(getUseLegacySql()));
- } catch (Exception e) {
- throw new IllegalArgumentException(
- String.format(QUERY_VALIDATION_FAILURE_ERROR, getQuery().get()), e);
+ if (getValidate()) {
+ if (table != null) {
+ checkState(table.isAccessible(), "Cannot call validate if table is dynamically set.");
+ }
+ if (table != null && table.get().getProjectId() != null) {
+ // Check for source table presence for early failure notification.
+ DatasetService datasetService = getBigQueryServices().getDatasetService(bqOptions);
+ BigQueryHelpers.verifyDatasetPresence(datasetService, table.get());
+ BigQueryHelpers.verifyTablePresence(datasetService, table.get());
+ } else if (getQuery() != null) {
+ checkState(
+ getQuery().isAccessible(), "Cannot call validate if query is dynamically set.");
+ JobService jobService = getBigQueryServices().getJobService(bqOptions);
+ try {
+ jobService.dryRunQuery(
+ bqOptions.getProject(),
+ new JobConfigurationQuery()
+ .setQuery(getQuery().get())
+ .setFlattenResults(getFlattenResults())
+ .setUseLegacySql(getUseLegacySql()));
+ } catch (Exception e) {
+ throw new IllegalArgumentException(
+ String.format(QUERY_VALIDATION_FAILURE_ERROR, getQuery().get()), e);
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/97810b4b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
index 1d45641..52b8259 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
@@ -63,7 +63,6 @@ class BigQueryTableSource extends BigQuerySourceBase {
@Override
protected TableReference getTableToExtract(BigQueryOptions bqOptions) throws IOException {
- checkState(jsonTable.isAccessible());
TableReference tableReference =
BigQueryIO.JSON_FACTORY.fromString(jsonTable.get(), TableReference.class);
return setDefaultProjectIfAbsent(bqOptions, tableReference);
@@ -94,7 +93,6 @@ class BigQueryTableSource extends BigQuerySourceBase {
@Override
public BoundedReader<TableRow> createReader(PipelineOptions options) throws IOException {
BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
- checkState(jsonTable.isAccessible());
TableReference tableRef = BigQueryIO.JSON_FACTORY.fromString(jsonTable.get(),
TableReference.class);
return new BigQueryReader(this, bqServices.getReaderFromTable(bqOptions, tableRef));
http://git-wip-us.apache.org/repos/asf/beam/blob/97810b4b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
index 46c2df4..e3780b4 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
@@ -146,17 +146,11 @@ public class PubsubIO {
private static void populateCommonDisplayData(DisplayData.Builder builder,
String timestampAttribute, String idAttribute, ValueProvider<PubsubTopic> topic) {
builder
- .addIfNotNull(DisplayData.item("timestampAttribute", timestampAttribute)
- .withLabel("Timestamp Attribute"))
- .addIfNotNull(DisplayData.item("idAttribute", idAttribute)
- .withLabel("ID Attribute"));
-
- if (topic != null) {
- String topicString = topic.isAccessible() ? topic.get().asPath()
- : topic.toString();
- builder.add(DisplayData.item("topic", topicString)
- .withLabel("Pubsub Topic"));
- }
+ .addIfNotNull(
+ DisplayData.item("timestampAttribute", timestampAttribute)
+ .withLabel("Timestamp Attribute"))
+ .addIfNotNull(DisplayData.item("idAttribute", idAttribute).withLabel("ID Attribute"))
+ .addIfNotNull(DisplayData.item("topic", topic).withLabel("Pubsub Topic"));
}
/**
@@ -263,6 +257,11 @@ public class PubsubIO {
return subscription;
}
}
+
+ @Override
+ public String toString() {
+ return asPath();
+ }
}
/**
@@ -428,6 +427,11 @@ public class PubsubIO {
return topic;
}
}
+
+ @Override
+ public String toString() {
+ return asPath();
+ }
}
/** Returns A {@link PTransform} that continuously reads from a Google Cloud Pub/Sub stream. */
@@ -734,13 +738,8 @@ public class PubsubIO {
super.populateDisplayData(builder);
populateCommonDisplayData(
builder, getTimestampAttribute(), getIdAttribute(), getTopicProvider());
-
- if (getSubscriptionProvider() != null) {
- String subscriptionString = getSubscriptionProvider().isAccessible()
- ? getSubscriptionProvider().get().asPath() : getSubscriptionProvider().toString();
- builder.add(DisplayData.item("subscription", subscriptionString)
- .withLabel("Pubsub Subscription"));
- }
+ builder.addIfNotNull(DisplayData.item("subscription", getSubscriptionProvider())
+ .withLabel("Pubsub Subscription"));
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/97810b4b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
index ad38e28..a8f6fa2 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
@@ -295,11 +295,7 @@ public class PubsubUnboundedSink extends PTransform<PCollection<PubsubMessage>,
@Override
public void populateDisplayData(Builder builder) {
super.populateDisplayData(builder);
- String topicString =
- topic == null ? null
- : topic.isAccessible() ? topic.get().getPath()
- : topic.toString();
- builder.add(DisplayData.item("topic", topicString));
+ builder.add(DisplayData.item("topic", topic));
builder.add(DisplayData.item("transport", pubsubFactory.getKind()));
builder.addIfNotNull(DisplayData.item("timestampAttribute", timestampAttribute));
builder.addIfNotNull(DisplayData.item("idAttribute", idAttribute));
http://git-wip-us.apache.org/repos/asf/beam/blob/97810b4b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
index 8da6ff4..bf3a121 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
@@ -1222,21 +1222,12 @@ public class PubsubUnboundedSource extends PTransform<PBegin, PCollection<Pubsub
@Override
public void populateDisplayData(Builder builder) {
super.populateDisplayData(builder);
- if (subscription != null) {
- String subscriptionString = subscription.isAccessible()
- ? subscription.get().getPath()
- : subscription.toString();
- builder.add(DisplayData.item("subscription", subscriptionString));
- }
- if (topic != null) {
- String topicString = topic.isAccessible()
- ? topic.get().getPath()
- : topic.toString();
- builder.add(DisplayData.item("topic", topicString));
- }
- builder.add(DisplayData.item("transport", pubsubFactory.getKind()));
- builder.addIfNotNull(DisplayData.item("timestampAttribute", timestampAttribute));
- builder.addIfNotNull(DisplayData.item("idAttribute", idAttribute));
+ builder
+ .addIfNotNull(DisplayData.item("subscription", subscription))
+ .addIfNotNull(DisplayData.item("topic", topic))
+ .add(DisplayData.item("transport", pubsubFactory.getKind()))
+ .addIfNotNull(DisplayData.item("timestampAttribute", timestampAttribute))
+ .addIfNotNull(DisplayData.item("idAttribute", idAttribute));
}
}
@@ -1416,8 +1407,6 @@ public class PubsubUnboundedSource extends PTransform<PBegin, PCollection<Pubsub
try (PubsubClient pubsubClient =
pubsubFactory.newClient(
timestampAttribute, idAttribute, options.as(PubsubOptions.class))) {
- checkState(project.isAccessible(), "createRandomSubscription must be called at runtime.");
- checkState(topic.isAccessible(), "createRandomSubscription must be called at runtime.");
SubscriptionPath subscriptionPath =
pubsubClient.createRandomSubscription(
project.get(), topic.get(), DEAULT_ACK_TIMEOUT_SEC);
[2/2] beam git commit: This closes #3732: Removes unnecessary calls
to ValueProvider.isAccessible
Posted by jk...@apache.org.
This closes #3732: Removes unnecessary calls to ValueProvider.isAccessible
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c1a75747
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c1a75747
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c1a75747
Branch: refs/heads/master
Commit: c1a75747664a8f70886dfede109413fc247e9b24
Parents: 6280d49 97810b4
Author: Eugene Kirpichov <ki...@google.com>
Authored: Tue Aug 29 15:42:31 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue Aug 29 15:42:31 2017 -0700
----------------------------------------------------------------------
.../java/org/apache/beam/sdk/io/AvroIO.java | 10 +----
.../beam/sdk/io/DefaultFilenamePolicy.java | 25 +++++------
.../org/apache/beam/sdk/io/FileBasedSink.java | 4 +-
.../org/apache/beam/sdk/io/FileBasedSource.java | 25 +++--------
.../java/org/apache/beam/sdk/io/TFRecordIO.java | 15 +------
.../java/org/apache/beam/sdk/io/TextIO.java | 14 +-----
.../java/org/apache/beam/sdk/io/WriteFiles.java | 6 +--
.../apache/beam/sdk/options/ValueProvider.java | 18 +++++---
.../sdk/transforms/display/DisplayData.java | 8 ++--
.../org/apache/beam/sdk/io/WriteFilesTest.java | 2 +-
.../beam/sdk/options/ValueProviderTest.java | 15 +++----
.../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 47 +++++++++++---------
.../io/gcp/bigquery/BigQueryTableSource.java | 2 -
.../apache/beam/sdk/io/gcp/pubsub/PubsubIO.java | 35 +++++++--------
.../sdk/io/gcp/pubsub/PubsubUnboundedSink.java | 6 +--
.../io/gcp/pubsub/PubsubUnboundedSource.java | 23 +++-------
16 files changed, 95 insertions(+), 160 deletions(-)
----------------------------------------------------------------------