You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/04/29 19:06:44 UTC

[2/5] beam git commit: Remove PipelineOptions from createWriteOperation()

Remove PipelineOptions from createWriteOperation()


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4291fa6d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4291fa6d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4291fa6d

Branch: refs/heads/master
Commit: 4291fa6ddc75a7142cacb39025b613eca54c48c3
Parents: f6ebb04
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Apr 24 13:57:23 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Sat Apr 29 10:41:33 2017 -0700

----------------------------------------------------------------------
 .../runners/core/construction/PTransformMatchersTest.java    | 4 +---
 .../beam/runners/direct/WriteWithShardingFactoryTest.java    | 2 +-
 .../core/src/main/java/org/apache/beam/sdk/io/AvroIO.java    | 2 +-
 .../src/main/java/org/apache/beam/sdk/io/FileBasedSink.java  | 2 +-
 .../src/main/java/org/apache/beam/sdk/io/TFRecordIO.java     | 2 +-
 .../core/src/main/java/org/apache/beam/sdk/io/TextIO.java    | 3 +--
 .../src/main/java/org/apache/beam/sdk/io/WriteFiles.java     | 2 +-
 .../test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java  | 2 +-
 .../src/test/java/org/apache/beam/sdk/io/SimpleSink.java     | 2 +-
 .../main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java  | 2 +-
 .../hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/Sink.java | 2 +-
 .../src/main/java/org/apache/beam/sdk/io/hdfs/Write.java     | 2 +-
 .../java/org/apache/beam/sdk/io/hdfs/HDFSFileSinkTest.java   | 2 +-
 .../src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java    | 2 +-
 .../test/java/org/apache/beam/sdk/io/xml/XmlSinkTest.java    | 8 ++++----
 15 files changed, 18 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/4291fa6d/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
index d9bc1e7..9754bb3 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
@@ -31,7 +31,6 @@ import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.FileBasedSink;
 import org.apache.beam.sdk.io.WriteFiles;
-import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.runners.PTransformMatcher;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
@@ -503,8 +502,7 @@ public class PTransformMatchersTest implements Serializable {
         WriteFiles.to(
             new FileBasedSink<Integer>("foo", "bar") {
               @Override
-              public FileBasedWriteOperation<Integer> createWriteOperation(
-                  PipelineOptions options) {
+              public FileBasedWriteOperation<Integer> createWriteOperation() {
                 return null;
               }
             });

http://git-wip-us.apache.org/repos/asf/beam/blob/4291fa6d/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
index b0c9f6d..960640c 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
@@ -216,7 +216,7 @@ public class WriteWithShardingFactoryTest {
     public void validate(PipelineOptions options) {}
 
     @Override
-    public FileBasedWriteOperation<Object> createWriteOperation(PipelineOptions options) {
+    public FileBasedWriteOperation<Object> createWriteOperation() {
       throw new IllegalArgumentException("Should not be used");
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/4291fa6d/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index 24e158f..a48976f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -989,7 +989,7 @@ public class AvroIO {
     }
 
     @Override
-    public FileBasedSink.FileBasedWriteOperation<T> createWriteOperation(PipelineOptions options) {
+    public FileBasedSink.FileBasedWriteOperation<T> createWriteOperation() {
       return new AvroWriteOperation<>(this, coder, codec, metadata);
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/4291fa6d/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index b8ad0a6..3354c67 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -449,7 +449,7 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
    * Return a subclass of {@link FileBasedSink.FileBasedWriteOperation} that will manage the write
    * to the sink.
    */
-  public abstract FileBasedWriteOperation<T> createWriteOperation(PipelineOptions options);
+  public abstract FileBasedWriteOperation<T> createWriteOperation();
 
   public void populateDisplayData(DisplayData.Builder builder) {
     getFileNamePolicy().populateDisplayData(builder);

http://git-wip-us.apache.org/repos/asf/beam/blob/4291fa6d/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
index a920283..1d7477b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
@@ -560,7 +560,7 @@ public class TFRecordIO {
     }
 
     @Override
-    public FileBasedWriteOperation<byte[]> createWriteOperation(PipelineOptions options) {
+    public FileBasedWriteOperation<byte[]> createWriteOperation() {
       return new TFRecordWriteOperation(this);
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/4291fa6d/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index 90dd80f..d161d23 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -1041,8 +1041,7 @@ public class TextIO {
     }
 
     @Override
-    public FileBasedSink.FileBasedWriteOperation<String> createWriteOperation(
-        PipelineOptions options) {
+    public FileBasedSink.FileBasedWriteOperation<String> createWriteOperation() {
       return new TextWriteOperation(this, header, footer);
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/4291fa6d/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
index 2787820..ba41593 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
@@ -124,7 +124,7 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
         WriteFiles.class.getSimpleName());
     PipelineOptions options = input.getPipeline().getOptions();
     sink.validate(options);
-    this.writeOperation = sink.createWriteOperation(options);
+    this.writeOperation = sink.createWriteOperation();
     this.writeOperation.setWindowedWrites(windowedWrites);
     return createWrite(input);
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/4291fa6d/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
index fe65a83..7efe47c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
@@ -531,7 +531,7 @@ public class FileBasedSinkTest {
     final String testUid = "testId";
     SimpleSink.SimpleWriteOperation writeOp =
         new SimpleSink(getBaseOutputFilename(), "txt", new DrunkWritableByteChannelFactory())
-            .createWriteOperation(null);
+            .createWriteOperation();
     final FileBasedWriter<String> writer =
         writeOp.createWriter(null);
     final String expectedFilename = IOChannelUtils.resolve(writeOp.tempDirectory.get(), testUid);

http://git-wip-us.apache.org/repos/asf/beam/blob/4291fa6d/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java
index e3cd9b6..8caf004 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java
@@ -39,7 +39,7 @@ class SimpleSink extends FileBasedSink<String> {
   }
 
   @Override
-  public SimpleWriteOperation createWriteOperation(PipelineOptions options) {
+  public SimpleWriteOperation createWriteOperation() {
     return new SimpleWriteOperation(this);
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/4291fa6d/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java
index aa9e41e..aee73c4 100644
--- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java
+++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java
@@ -250,7 +250,7 @@ public abstract class HDFSFileSink<T, K, V> extends Sink<T> {
   }
 
   @Override
-  public Sink.WriteOperation<T, String> createWriteOperation(PipelineOptions options) {
+  public Sink.WriteOperation<T, String> createWriteOperation() {
     return new HDFSWriteOperation<>(this, path(), formatClass());
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/4291fa6d/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/Sink.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/Sink.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/Sink.java
index 5a3fcd9..fe2db5f 100644
--- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/Sink.java
+++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/Sink.java
@@ -39,7 +39,7 @@ public abstract class Sink<T> implements Serializable, HasDisplayData {
   /**
    * Returns an instance of a {@link WriteOperation} that can write to this Sink.
    */
-  public abstract WriteOperation<T, ?> createWriteOperation(PipelineOptions options);
+  public abstract WriteOperation<T, ?> createWriteOperation();
 
   /**
    * {@inheritDoc}

http://git-wip-us.apache.org/repos/asf/beam/blob/4291fa6d/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/Write.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/Write.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/Write.java
index 8c2fc99..03e7c70 100644
--- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/Write.java
+++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/Write.java
@@ -104,7 +104,7 @@ public class Write<T> extends PTransform<PCollection<T>, PDone> {
         Write.class.getSimpleName());
     PipelineOptions options = input.getPipeline().getOptions();
     sink.validate(options);
-    return createWrite(input, sink.createWriteOperation(options));
+    return createWrite(input, sink.createWriteOperation());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/4291fa6d/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSinkTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSinkTest.java b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSinkTest.java
index aeb258f..9fa6606 100644
--- a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSinkTest.java
+++ b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSinkTest.java
@@ -63,7 +63,7 @@ public class HDFSFileSinkTest {
                            PipelineOptions options,
                            Iterable<T> toWrite) throws Exception {
     Sink.WriteOperation<T, String> writeOperation =
-        (Sink.WriteOperation<T, String>) sink.createWriteOperation(options);
+        (Sink.WriteOperation<T, String>) sink.createWriteOperation();
     Sink.Writer<T, String> writer = writeOperation.createWriter(options);
     writer.openUnwindowed(UUID.randomUUID().toString(),  -1, -1);
     for (T t: toWrite) {

http://git-wip-us.apache.org/repos/asf/beam/blob/4291fa6d/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java
index a1ebf6c..8a1621e 100644
--- a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java
+++ b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java
@@ -53,7 +53,7 @@ class XmlSink<T> extends FileBasedSink<T> {
    * Creates an {@link XmlWriteOperation}.
    */
   @Override
-  public XmlWriteOperation<T> createWriteOperation(PipelineOptions options) {
+  public XmlWriteOperation<T> createWriteOperation() {
     return new XmlWriteOperation<>(this);
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/4291fa6d/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSinkTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSinkTest.java b/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSinkTest.java
index bf15cfe..7f9a8c5 100644
--- a/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSinkTest.java
+++ b/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSinkTest.java
@@ -76,7 +76,7 @@ public class XmlSinkTest {
             .withRecordClass(Bird.class)
             .withRootElement("birds")
             .createSink()
-            .createWriteOperation(options);
+            .createWriteOperation();
     XmlWriter<Bird> writer = writeOp.createWriter(options);
 
     List<Bird> bundle =
@@ -97,7 +97,7 @@ public class XmlSinkTest {
             .withRootElement("birds")
             .withCharset(StandardCharsets.ISO_8859_1)
             .createSink()
-            .createWriteOperation(options);
+            .createWriteOperation();
     XmlWriter<Bird> writer = writeOp.createWriter(options);
 
     List<Bird> bundle = Lists.newArrayList(new Bird("br�che", "pin�on"));
@@ -155,7 +155,7 @@ public class XmlSinkTest {
             .withRootElement(testRootElement)
             .toFilenamePrefix(testFilePrefix)
             .createSink();
-    XmlWriteOperation<Bird> writeOp = sink.createWriteOperation(options);
+    XmlWriteOperation<Bird> writeOp = sink.createWriteOperation();
     Path outputPath = new File(testFilePrefix).toPath();
     Path tempPath = new File(writeOp.getTemporaryDirectory()).toPath();
     assertEquals(outputPath.getParent(), tempPath.getParent());
@@ -175,7 +175,7 @@ public class XmlSinkTest {
             .withRootElement(testRootElement)
             .toFilenamePrefix(testFilePrefix)
             .createSink()
-            .createWriteOperation(options);
+            .createWriteOperation();
     XmlWriter<Bird> writer = writeOp.createWriter(options);
     Path outputPath = new File(testFilePrefix).toPath();
     Path tempPath = new File(writer.getWriteOperation().getTemporaryDirectory()).toPath();