You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/08/29 23:02:59 UTC

[1/2] beam git commit: Removes unnecessary calls to ValueProvider.isAccessible

Repository: beam
Updated Branches:
  refs/heads/master 6280d497b -> c1a757476


Removes unnecessary calls to ValueProvider.isAccessible


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/97810b4b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/97810b4b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/97810b4b

Branch: refs/heads/master
Commit: 97810b4b23037fe333af103661bbb15acec96a57
Parents: 6280d49
Author: Eugene Kirpichov <ki...@google.com>
Authored: Thu Aug 17 19:44:17 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue Aug 29 15:42:10 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/AvroIO.java     | 10 +----
 .../beam/sdk/io/DefaultFilenamePolicy.java      | 25 +++++------
 .../org/apache/beam/sdk/io/FileBasedSink.java   |  4 +-
 .../org/apache/beam/sdk/io/FileBasedSource.java | 25 +++--------
 .../java/org/apache/beam/sdk/io/TFRecordIO.java | 15 +------
 .../java/org/apache/beam/sdk/io/TextIO.java     | 14 +-----
 .../java/org/apache/beam/sdk/io/WriteFiles.java |  6 +--
 .../apache/beam/sdk/options/ValueProvider.java  | 18 +++++---
 .../sdk/transforms/display/DisplayData.java     |  8 ++--
 .../org/apache/beam/sdk/io/WriteFilesTest.java  |  2 +-
 .../beam/sdk/options/ValueProviderTest.java     | 15 +++----
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java    | 47 +++++++++++---------
 .../io/gcp/bigquery/BigQueryTableSource.java    |  2 -
 .../apache/beam/sdk/io/gcp/pubsub/PubsubIO.java | 35 +++++++--------
 .../sdk/io/gcp/pubsub/PubsubUnboundedSink.java  |  6 +--
 .../io/gcp/pubsub/PubsubUnboundedSource.java    | 23 +++-------
 16 files changed, 95 insertions(+), 160 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/97810b4b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index 910d8e2..9e0422e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -989,19 +989,11 @@ public class AvroIO {
     public void populateDisplayData(DisplayData.Builder builder) {
       super.populateDisplayData(builder);
       resolveDynamicDestinations().populateDisplayData(builder);
-
-      String tempDirectory = null;
-      if (getTempDirectory() != null) {
-        tempDirectory =
-            getTempDirectory().isAccessible()
-                ? getTempDirectory().get().toString()
-                : getTempDirectory().toString();
-      }
       builder
           .addIfNotDefault(
               DisplayData.item("numShards", getNumShards()).withLabel("Maximum Output Shards"), 0)
           .addIfNotNull(
-              DisplayData.item("tempDirectory", tempDirectory)
+              DisplayData.item("tempDirectory", getTempDirectory())
                   .withLabel("Directory for temporary files"));
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/97810b4b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java
index 1f438d5..2f22e82 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java
@@ -368,26 +368,21 @@ public final class DefaultFilenamePolicy extends FilenamePolicy {
 
   @Override
   public void populateDisplayData(DisplayData.Builder builder) {
-    String filenamePattern;
-    if (params.baseFilename.isAccessible()) {
-      filenamePattern =
-          String.format("%s%s%s", params.baseFilename.get(), params.shardTemplate, params.suffix);
-    } else {
-      filenamePattern =
-          String.format("%s%s%s", params.baseFilename, params.shardTemplate, params.suffix);
-    }
-
-    String outputPrefixString = null;
-    outputPrefixString =
+    String displayBaseFilename =
         params.baseFilename.isAccessible()
             ? params.baseFilename.get().toString()
-            : params.baseFilename.toString();
-    builder.add(DisplayData.item("filenamePattern", filenamePattern).withLabel("Filename Pattern"));
-    builder.add(DisplayData.item("filePrefix", outputPrefixString).withLabel("Output File Prefix"));
-    builder.add(DisplayData.item("fileSuffix", params.suffix).withLabel("Output file Suffix"));
+            : ("(" + params.baseFilename + ")");
+    builder.add(
+        DisplayData.item(
+                "filenamePattern",
+                String.format("%s%s%s", displayBaseFilename, params.shardTemplate, params.suffix))
+            .withLabel("Filename pattern"));
+    builder.add(
+        DisplayData.item("filePrefix", params.baseFilename).withLabel("Output File Prefix"));
     builder.add(
         DisplayData.item("shardNameTemplate", params.shardTemplate)
             .withLabel("Output Shard Name Template"));
+    builder.add(DisplayData.item("fileSuffix", params.suffix).withLabel("Output file Suffix"));
   }
 
   private static String extractFilename(ResourceId input) {

http://git-wip-us.apache.org/repos/asf/beam/blob/97810b4b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index 4e2b61c..d618647 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -747,12 +747,10 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
 
     @Override
     public String toString() {
-      String tempDirectoryStr =
-          tempDirectory.isAccessible() ? tempDirectory.get().toString() : tempDirectory.toString();
       return getClass().getSimpleName()
           + "{"
           + "tempDirectory="
-          + tempDirectoryStr
+          + tempDirectory
           + ", windowedWrites="
           + windowedWrites
           + '}';

http://git-wip-us.apache.org/repos/asf/beam/blob/97810b4b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
index 7f865de..f835fa4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
@@ -211,10 +211,6 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
     // This implementation of method getEstimatedSizeBytes is provided to simplify subclasses. Here
     // we perform the size estimation of files and file patterns using the interface provided by
     // FileSystem.
-    checkState(
-        fileOrPatternSpec.isAccessible(),
-        "Cannot estimate size of a FileBasedSource with inaccessible file pattern: {}.",
-        fileOrPatternSpec);
     String fileOrPattern = fileOrPatternSpec.get();
 
     if (mode == Mode.FILEPATTERN) {
@@ -240,10 +236,9 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
   public void populateDisplayData(DisplayData.Builder builder) {
     super.populateDisplayData(builder);
     if (mode == Mode.FILEPATTERN) {
-      String patternDisplay = getFileOrPatternSpecProvider().isAccessible()
-          ? getFileOrPatternSpecProvider().get()
-          : getFileOrPatternSpecProvider().toString();
-      builder.add(DisplayData.item("filePattern", patternDisplay).withLabel("File Pattern"));
+      builder.add(
+          DisplayData.item("filePattern", getFileOrPatternSpecProvider())
+              .withLabel("File Pattern"));
     }
   }
 
@@ -254,10 +249,6 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
     // split a FileBasedSource based on a file pattern to FileBasedSources based on full single
     // files. For files that can be efficiently seeked, we further split FileBasedSources based on
     // those files to FileBasedSources based on sub ranges of single files.
-    checkState(
-        fileOrPatternSpec.isAccessible(),
-        "Cannot split a FileBasedSource without access to the file or pattern specification: {}.",
-        fileOrPatternSpec);
     String fileOrPattern = fileOrPatternSpec.get();
 
     if (mode == Mode.FILEPATTERN) {
@@ -326,10 +317,6 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
   public final BoundedReader<T> createReader(PipelineOptions options) throws IOException {
     // Validate the current source prior to creating a reader for it.
     this.validate();
-    checkState(
-        fileOrPatternSpec.isAccessible(),
-        "Cannot create a file reader without access to the file or pattern specification: {}.",
-        fileOrPatternSpec);
     String fileOrPattern = fileOrPatternSpec.get();
 
     if (mode == Mode.FILEPATTERN) {
@@ -358,13 +345,11 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
 
   @Override
   public String toString() {
-    String fileString = fileOrPatternSpec.isAccessible()
-        ? fileOrPatternSpec.get() : fileOrPatternSpec.toString();
     switch (mode) {
       case FILEPATTERN:
-        return fileString;
+        return fileOrPatternSpec.toString();
       case SINGLE_FILE_OR_SUBRANGE:
-        return fileString + " range " + super.toString();
+        return fileOrPatternSpec + " range " + super.toString();
       default:
         throw new IllegalStateException("Unexpected mode: " + mode);
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/97810b4b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
index c75051f..526c50e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
@@ -195,15 +195,12 @@ public class TFRecordIO {
     @Override
     public void populateDisplayData(DisplayData.Builder builder) {
       super.populateDisplayData(builder);
-
-      String filepatternDisplay = getFilepattern().isAccessible()
-          ? getFilepattern().get() : getFilepattern().toString();
       builder
           .add(DisplayData.item("compressionType", getCompressionType().toString())
               .withLabel("Compression Type"))
           .addIfNotDefault(DisplayData.item("validation", getValidate())
               .withLabel("Validation Enabled"), true)
-          .addIfNotNull(DisplayData.item("filePattern", filepatternDisplay)
+          .addIfNotNull(DisplayData.item("filePattern", getFilepattern())
               .withLabel("File Pattern"));
     }
   }
@@ -360,16 +357,8 @@ public class TFRecordIO {
     @Override
     public void populateDisplayData(DisplayData.Builder builder) {
       super.populateDisplayData(builder);
-
-      String outputPrefixString = null;
-      if (getOutputPrefix().isAccessible()) {
-        ResourceId dir = getOutputPrefix().get();
-        outputPrefixString = dir.toString();
-      } else {
-        outputPrefixString = getOutputPrefix().toString();
-      }
       builder
-          .add(DisplayData.item("filePrefix", outputPrefixString)
+          .add(DisplayData.item("filePrefix", getOutputPrefix())
               .withLabel("Output File Prefix"))
           .addIfNotNull(DisplayData.item("fileSuffix", getFilenameSuffix())
               .withLabel("Output File Suffix"))

http://git-wip-us.apache.org/repos/asf/beam/blob/97810b4b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index 612f5c5..cbc17ff 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -399,15 +399,12 @@ public class TextIO {
     @Override
     public void populateDisplayData(DisplayData.Builder builder) {
       super.populateDisplayData(builder);
-
-      String filepatternDisplay = getFilepattern().isAccessible()
-        ? getFilepattern().get() : getFilepattern().toString();
       builder
           .add(
               DisplayData.item("compressionType", getCompressionType().toString())
                   .withLabel("Compression Type"))
           .addIfNotNull(
-              DisplayData.item("filePattern", filepatternDisplay).withLabel("File Pattern"))
+              DisplayData.item("filePattern", getFilepattern()).withLabel("File Pattern"))
           .add(
               DisplayData.item("emptyMatchTreatment", getEmptyMatchTreatment().toString())
                   .withLabel("Treatment of filepatterns that match no files"))
@@ -904,18 +901,11 @@ public class TextIO {
       super.populateDisplayData(builder);
 
       resolveDynamicDestinations().populateDisplayData(builder);
-      String tempDirectory = null;
-      if (getTempDirectory() != null) {
-        tempDirectory =
-            getTempDirectory().isAccessible()
-                ? getTempDirectory().get().toString()
-                : getTempDirectory().toString();
-      }
       builder
           .addIfNotDefault(
               DisplayData.item("numShards", getNumShards()).withLabel("Maximum Output Shards"), 0)
           .addIfNotNull(
-              DisplayData.item("tempDirectory", tempDirectory)
+              DisplayData.item("tempDirectory", getTempDirectory())
                   .withLabel("Directory for temporary files"))
           .addIfNotNull(DisplayData.item("fileHeader", getHeader()).withLabel("File Header"))
           .addIfNotNull(DisplayData.item("fileFooter", getFooter()).withLabel("File Footer"))

http://git-wip-us.apache.org/repos/asf/beam/blob/97810b4b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
index 85c5652..7878c73 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
@@ -205,10 +205,8 @@ public class WriteFiles<UserT, DestinationT, OutputT>
         .include("sink", sink);
     if (getSharding() != null) {
       builder.include("sharding", getSharding());
-    } else if (getNumShards() != null) {
-      String numShards = getNumShards().isAccessible()
-          ? getNumShards().get().toString() : getNumShards().toString();
-      builder.add(DisplayData.item("numShards", numShards)
+    } else {
+      builder.addIfNotNull(DisplayData.item("numShards", getNumShards())
           .withLabel("Fixed Number of Shards"));
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/97810b4b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProvider.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProvider.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProvider.java
index 94187a9..15413e8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProvider.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProvider.java
@@ -101,9 +101,7 @@ public interface ValueProvider<T> extends Serializable {
 
     @Override
     public String toString() {
-      return MoreObjects.toStringHelper(this)
-          .add("value", value)
-          .toString();
+      return String.valueOf(value);
     }
   }
 
@@ -160,8 +158,12 @@ public interface ValueProvider<T> extends Serializable {
 
     @Override
     public String toString() {
+      if (isAccessible()) {
+        return String.valueOf(get());
+      }
       return MoreObjects.toStringHelper(this)
           .add("value", value)
+          .add("translator", translator.getClass().getSimpleName())
           .toString();
     }
   }
@@ -226,7 +228,8 @@ public interface ValueProvider<T> extends Serializable {
     public T get() {
       PipelineOptions options = optionsMap.get(optionsId);
       if (options == null) {
-        throw new RuntimeException("Not called from a runtime context.");
+        throw new IllegalStateException(
+            "Value only available at runtime, but accessed from a non-runtime context: " + this);
       }
       try {
         Method method = klass.getMethod(methodName);
@@ -249,8 +252,7 @@ public interface ValueProvider<T> extends Serializable {
 
     @Override
     public boolean isAccessible() {
-      PipelineOptions options = optionsMap.get(optionsId);
-      return options != null;
+      return optionsMap.get(optionsId) != null;
     }
 
     /**
@@ -262,10 +264,12 @@ public interface ValueProvider<T> extends Serializable {
 
     @Override
     public String toString() {
+      if (isAccessible()) {
+        return String.valueOf(get());
+      }
       return MoreObjects.toStringHelper(this)
           .add("propertyName", propertyName)
           .add("default", defaultValue)
-          .add("value", isAccessible() ? get() : null)
           .toString();
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/97810b4b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java
index 3c4337b..10ef428 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java
@@ -882,12 +882,12 @@ public class DisplayData implements Serializable {
         return item(key, Type.STRING, null);
       }
       Type type = inferType(got);
-      if (type == null) {
-        throw new RuntimeException(String.format("Unknown value type: %s", got));
+      if (type != null) {
+        return item(key, type, got);
       }
-      return item(key, type, got);
     }
-    return item(key, Type.STRING, value.toString());
+    // General case: not null and type not inferable. Fall back to toString of the VP itself.
+    return item(key, Type.STRING, String.valueOf(value));
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/97810b4b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java
index 1d4ce08..5e0d685 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java
@@ -518,7 +518,7 @@ public class WriteFilesTest {
     DisplayData displayData = DisplayData.from(write);
     assertThat(displayData, hasDisplayItem("sink", sink.getClass()));
     assertThat(displayData, includesDisplayDataFor("sink", sink));
-    assertThat(displayData, hasDisplayItem("numShards", "1"));
+    assertThat(displayData, hasDisplayItem("numShards", 1));
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/beam/blob/97810b4b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ValueProviderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ValueProviderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ValueProviderTest.java
index e596cc1..7bbbf7e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ValueProviderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ValueProviderTest.java
@@ -88,7 +88,7 @@ public class ValueProviderTest {
     ValueProvider<String> provider = StaticValueProvider.of("foo");
     assertEquals("foo", provider.get());
     assertTrue(provider.isAccessible());
-    assertEquals("StaticValueProvider{value=foo}", provider.toString());
+    assertEquals("foo", provider.toString());
   }
 
   @Test
@@ -97,8 +97,9 @@ public class ValueProviderTest {
     ValueProvider<String> provider = options.getFoo();
     assertFalse(provider.isAccessible());
 
-    expectedException.expect(RuntimeException.class);
-    expectedException.expectMessage("Not called from a runtime context");
+    expectedException.expect(IllegalStateException.class);
+    expectedException.expectMessage("Value only available at runtime");
+    expectedException.expectMessage("foo");
     provider.get();
   }
 
@@ -108,7 +109,7 @@ public class ValueProviderTest {
     ValueProvider<String> provider = options.getFoo();
     assertEquals("foo", ((RuntimeValueProvider) provider).propertyName());
     assertEquals(
-        "RuntimeValueProvider{propertyName=foo, default=null, value=null}",
+        "RuntimeValueProvider{propertyName=foo, default=null}",
         provider.toString());
   }
 
@@ -239,9 +240,7 @@ public class ValueProviderTest {
       });
     assertTrue(nvp.isAccessible());
     assertEquals("foobar", nvp.get());
-    assertEquals(
-        "NestedValueProvider{value=StaticValueProvider{value=foo}}",
-        nvp.toString());
+    assertEquals("foobar", nvp.toString());
   }
 
   @Test
@@ -266,7 +265,7 @@ public class ValueProviderTest {
     assertEquals("bar", ((NestedValueProvider) doubleNvp).propertyName());
     assertFalse(nvp.isAccessible());
     expectedException.expect(RuntimeException.class);
-    expectedException.expectMessage("Not called from a runtime context");
+    expectedException.expectMessage("Value only available at runtime");
     nvp.get();
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/97810b4b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 29828e4..1e0ab30 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -451,8 +451,7 @@ public class BigQueryIO {
 
     private BigQuerySourceBase createSource(String jobUuid) {
       BigQuerySourceBase source;
-      if (getQuery() == null
-          || (getQuery().isAccessible() && Strings.isNullOrEmpty(getQuery().get()))) {
+      if (getQuery() == null) {
         source = BigQueryTableSource.create(jobUuid, getTableProvider(), getBigQueryServices());
       } else {
         source =
@@ -517,26 +516,30 @@ public class BigQueryIO {
       // Note that a table or query check can fail if the table or dataset are created by
       // earlier stages of the pipeline or if a query depends on earlier stages of a pipeline.
       // For these cases the withoutValidation method can be used to disable the check.
-      if (getValidate() && table != null && table.isAccessible()
-          && table.get().getProjectId() != null) {
-        checkState(table.isAccessible(), "Cannot call validate if table is dynamically set.");
-        // Check for source table presence for early failure notification.
-        DatasetService datasetService = getBigQueryServices().getDatasetService(bqOptions);
-        BigQueryHelpers.verifyDatasetPresence(datasetService, table.get());
-        BigQueryHelpers.verifyTablePresence(datasetService, table.get());
-      } else if (getValidate() && getQuery() != null) {
-        checkState(getQuery().isAccessible(), "Cannot call validate if query is dynamically set.");
-        JobService jobService = getBigQueryServices().getJobService(bqOptions);
-        try {
-          jobService.dryRunQuery(
-              bqOptions.getProject(),
-              new JobConfigurationQuery()
-                  .setQuery(getQuery().get())
-                  .setFlattenResults(getFlattenResults())
-                  .setUseLegacySql(getUseLegacySql()));
-        } catch (Exception e) {
-          throw new IllegalArgumentException(
-              String.format(QUERY_VALIDATION_FAILURE_ERROR, getQuery().get()), e);
+      if (getValidate()) {
+        if (table != null) {
+          checkState(table.isAccessible(), "Cannot call validate if table is dynamically set.");
+        }
+        if (table != null && table.get().getProjectId() != null) {
+          // Check for source table presence for early failure notification.
+          DatasetService datasetService = getBigQueryServices().getDatasetService(bqOptions);
+          BigQueryHelpers.verifyDatasetPresence(datasetService, table.get());
+          BigQueryHelpers.verifyTablePresence(datasetService, table.get());
+        } else if (getQuery() != null) {
+          checkState(
+              getQuery().isAccessible(), "Cannot call validate if query is dynamically set.");
+          JobService jobService = getBigQueryServices().getJobService(bqOptions);
+          try {
+            jobService.dryRunQuery(
+                bqOptions.getProject(),
+                new JobConfigurationQuery()
+                    .setQuery(getQuery().get())
+                    .setFlattenResults(getFlattenResults())
+                    .setUseLegacySql(getUseLegacySql()));
+          } catch (Exception e) {
+            throw new IllegalArgumentException(
+                String.format(QUERY_VALIDATION_FAILURE_ERROR, getQuery().get()), e);
+          }
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/97810b4b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
index 1d45641..52b8259 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
@@ -63,7 +63,6 @@ class BigQueryTableSource extends BigQuerySourceBase {
 
   @Override
   protected TableReference getTableToExtract(BigQueryOptions bqOptions) throws IOException {
-    checkState(jsonTable.isAccessible());
     TableReference tableReference =
         BigQueryIO.JSON_FACTORY.fromString(jsonTable.get(), TableReference.class);
     return setDefaultProjectIfAbsent(bqOptions, tableReference);
@@ -94,7 +93,6 @@ class BigQueryTableSource extends BigQuerySourceBase {
   @Override
   public BoundedReader<TableRow> createReader(PipelineOptions options) throws IOException {
     BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
-    checkState(jsonTable.isAccessible());
     TableReference tableRef = BigQueryIO.JSON_FACTORY.fromString(jsonTable.get(),
         TableReference.class);
     return new BigQueryReader(this, bqServices.getReaderFromTable(bqOptions, tableRef));

http://git-wip-us.apache.org/repos/asf/beam/blob/97810b4b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
index 46c2df4..e3780b4 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
@@ -146,17 +146,11 @@ public class PubsubIO {
   private static void populateCommonDisplayData(DisplayData.Builder builder,
       String timestampAttribute, String idAttribute, ValueProvider<PubsubTopic> topic) {
     builder
-        .addIfNotNull(DisplayData.item("timestampAttribute", timestampAttribute)
-            .withLabel("Timestamp Attribute"))
-        .addIfNotNull(DisplayData.item("idAttribute", idAttribute)
-            .withLabel("ID Attribute"));
-
-    if (topic != null) {
-      String topicString = topic.isAccessible() ? topic.get().asPath()
-          : topic.toString();
-      builder.add(DisplayData.item("topic", topicString)
-          .withLabel("Pubsub Topic"));
-    }
+        .addIfNotNull(
+            DisplayData.item("timestampAttribute", timestampAttribute)
+                .withLabel("Timestamp Attribute"))
+        .addIfNotNull(DisplayData.item("idAttribute", idAttribute).withLabel("ID Attribute"))
+        .addIfNotNull(DisplayData.item("topic", topic).withLabel("Pubsub Topic"));
   }
 
   /**
@@ -263,6 +257,11 @@ public class PubsubIO {
         return subscription;
       }
     }
+
+    @Override
+    public String toString() {
+      return asPath();
+    }
   }
 
   /**
@@ -428,6 +427,11 @@ public class PubsubIO {
         return topic;
       }
     }
+
+    @Override
+    public String toString() {
+      return asPath();
+    }
   }
 
    /** Returns A {@link PTransform} that continuously reads from a Google Cloud Pub/Sub stream. */
@@ -734,13 +738,8 @@ public class PubsubIO {
       super.populateDisplayData(builder);
       populateCommonDisplayData(
           builder, getTimestampAttribute(), getIdAttribute(), getTopicProvider());
-
-      if (getSubscriptionProvider() != null) {
-        String subscriptionString = getSubscriptionProvider().isAccessible()
-            ? getSubscriptionProvider().get().asPath() : getSubscriptionProvider().toString();
-        builder.add(DisplayData.item("subscription", subscriptionString)
-            .withLabel("Pubsub Subscription"));
-      }
+      builder.addIfNotNull(DisplayData.item("subscription", getSubscriptionProvider())
+          .withLabel("Pubsub Subscription"));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/97810b4b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
index ad38e28..a8f6fa2 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
@@ -295,11 +295,7 @@ public class PubsubUnboundedSink extends PTransform<PCollection<PubsubMessage>,
     @Override
     public void populateDisplayData(Builder builder) {
       super.populateDisplayData(builder);
-        String topicString =
-            topic == null ? null
-            : topic.isAccessible() ? topic.get().getPath()
-            : topic.toString();
-      builder.add(DisplayData.item("topic", topicString));
+      builder.add(DisplayData.item("topic", topic));
       builder.add(DisplayData.item("transport", pubsubFactory.getKind()));
       builder.addIfNotNull(DisplayData.item("timestampAttribute", timestampAttribute));
       builder.addIfNotNull(DisplayData.item("idAttribute", idAttribute));

http://git-wip-us.apache.org/repos/asf/beam/blob/97810b4b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
index 8da6ff4..bf3a121 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
@@ -1222,21 +1222,12 @@ public class PubsubUnboundedSource extends PTransform<PBegin, PCollection<Pubsub
     @Override
     public void populateDisplayData(Builder builder) {
       super.populateDisplayData(builder);
-      if (subscription != null) {
-        String subscriptionString = subscription.isAccessible()
-            ? subscription.get().getPath()
-            : subscription.toString();
-        builder.add(DisplayData.item("subscription", subscriptionString));
-      }
-      if (topic != null) {
-        String topicString = topic.isAccessible()
-            ? topic.get().getPath()
-            : topic.toString();
-        builder.add(DisplayData.item("topic", topicString));
-      }
-      builder.add(DisplayData.item("transport", pubsubFactory.getKind()));
-      builder.addIfNotNull(DisplayData.item("timestampAttribute", timestampAttribute));
-      builder.addIfNotNull(DisplayData.item("idAttribute", idAttribute));
+      builder
+          .addIfNotNull(DisplayData.item("subscription", subscription))
+          .addIfNotNull(DisplayData.item("topic", topic))
+          .add(DisplayData.item("transport", pubsubFactory.getKind()))
+          .addIfNotNull(DisplayData.item("timestampAttribute", timestampAttribute))
+          .addIfNotNull(DisplayData.item("idAttribute", idAttribute));
     }
   }
 
@@ -1416,8 +1407,6 @@ public class PubsubUnboundedSource extends PTransform<PBegin, PCollection<Pubsub
       try (PubsubClient pubsubClient =
           pubsubFactory.newClient(
               timestampAttribute, idAttribute, options.as(PubsubOptions.class))) {
-        checkState(project.isAccessible(), "createRandomSubscription must be called at runtime.");
-        checkState(topic.isAccessible(), "createRandomSubscription must be called at runtime.");
         SubscriptionPath subscriptionPath =
             pubsubClient.createRandomSubscription(
                 project.get(), topic.get(), DEAULT_ACK_TIMEOUT_SEC);


[2/2] beam git commit: This closes #3732: Removes unnecessary calls to ValueProvider.isAccessible

Posted by jk...@apache.org.
This closes #3732: Removes unnecessary calls to ValueProvider.isAccessible


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c1a75747
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c1a75747
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c1a75747

Branch: refs/heads/master
Commit: c1a75747664a8f70886dfede109413fc247e9b24
Parents: 6280d49 97810b4
Author: Eugene Kirpichov <ki...@google.com>
Authored: Tue Aug 29 15:42:31 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue Aug 29 15:42:31 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/AvroIO.java     | 10 +----
 .../beam/sdk/io/DefaultFilenamePolicy.java      | 25 +++++------
 .../org/apache/beam/sdk/io/FileBasedSink.java   |  4 +-
 .../org/apache/beam/sdk/io/FileBasedSource.java | 25 +++--------
 .../java/org/apache/beam/sdk/io/TFRecordIO.java | 15 +------
 .../java/org/apache/beam/sdk/io/TextIO.java     | 14 +-----
 .../java/org/apache/beam/sdk/io/WriteFiles.java |  6 +--
 .../apache/beam/sdk/options/ValueProvider.java  | 18 +++++---
 .../sdk/transforms/display/DisplayData.java     |  8 ++--
 .../org/apache/beam/sdk/io/WriteFilesTest.java  |  2 +-
 .../beam/sdk/options/ValueProviderTest.java     | 15 +++----
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java    | 47 +++++++++++---------
 .../io/gcp/bigquery/BigQueryTableSource.java    |  2 -
 .../apache/beam/sdk/io/gcp/pubsub/PubsubIO.java | 35 +++++++--------
 .../sdk/io/gcp/pubsub/PubsubUnboundedSink.java  |  6 +--
 .../io/gcp/pubsub/PubsubUnboundedSource.java    | 23 +++-------
 16 files changed, 95 insertions(+), 160 deletions(-)
----------------------------------------------------------------------