You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/04/29 19:06:44 UTC
[2/5] beam git commit: Remove PipelineOptions from
createWriteOperation()
Remove PipelineOptions from createWriteOperation()
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4291fa6d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4291fa6d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4291fa6d
Branch: refs/heads/master
Commit: 4291fa6ddc75a7142cacb39025b613eca54c48c3
Parents: f6ebb04
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Apr 24 13:57:23 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Sat Apr 29 10:41:33 2017 -0700
----------------------------------------------------------------------
.../runners/core/construction/PTransformMatchersTest.java | 4 +---
.../beam/runners/direct/WriteWithShardingFactoryTest.java | 2 +-
.../core/src/main/java/org/apache/beam/sdk/io/AvroIO.java | 2 +-
.../src/main/java/org/apache/beam/sdk/io/FileBasedSink.java | 2 +-
.../src/main/java/org/apache/beam/sdk/io/TFRecordIO.java | 2 +-
.../core/src/main/java/org/apache/beam/sdk/io/TextIO.java | 3 +--
.../src/main/java/org/apache/beam/sdk/io/WriteFiles.java | 2 +-
.../test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java | 2 +-
.../src/test/java/org/apache/beam/sdk/io/SimpleSink.java | 2 +-
.../main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java | 2 +-
.../hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/Sink.java | 2 +-
.../src/main/java/org/apache/beam/sdk/io/hdfs/Write.java | 2 +-
.../java/org/apache/beam/sdk/io/hdfs/HDFSFileSinkTest.java | 2 +-
.../src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java | 2 +-
.../test/java/org/apache/beam/sdk/io/xml/XmlSinkTest.java | 8 ++++----
15 files changed, 18 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/4291fa6d/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
index d9bc1e7..9754bb3 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
@@ -31,7 +31,6 @@ import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.io.WriteFiles;
-import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.runners.PTransformMatcher;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.AppliedPTransform;
@@ -503,8 +502,7 @@ public class PTransformMatchersTest implements Serializable {
WriteFiles.to(
new FileBasedSink<Integer>("foo", "bar") {
@Override
- public FileBasedWriteOperation<Integer> createWriteOperation(
- PipelineOptions options) {
+ public FileBasedWriteOperation<Integer> createWriteOperation() {
return null;
}
});
http://git-wip-us.apache.org/repos/asf/beam/blob/4291fa6d/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
index b0c9f6d..960640c 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
@@ -216,7 +216,7 @@ public class WriteWithShardingFactoryTest {
public void validate(PipelineOptions options) {}
@Override
- public FileBasedWriteOperation<Object> createWriteOperation(PipelineOptions options) {
+ public FileBasedWriteOperation<Object> createWriteOperation() {
throw new IllegalArgumentException("Should not be used");
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/4291fa6d/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index 24e158f..a48976f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -989,7 +989,7 @@ public class AvroIO {
}
@Override
- public FileBasedSink.FileBasedWriteOperation<T> createWriteOperation(PipelineOptions options) {
+ public FileBasedSink.FileBasedWriteOperation<T> createWriteOperation() {
return new AvroWriteOperation<>(this, coder, codec, metadata);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/4291fa6d/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index b8ad0a6..3354c67 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -449,7 +449,7 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
* Return a subclass of {@link FileBasedSink.FileBasedWriteOperation} that will manage the write
* to the sink.
*/
- public abstract FileBasedWriteOperation<T> createWriteOperation(PipelineOptions options);
+ public abstract FileBasedWriteOperation<T> createWriteOperation();
public void populateDisplayData(DisplayData.Builder builder) {
getFileNamePolicy().populateDisplayData(builder);
http://git-wip-us.apache.org/repos/asf/beam/blob/4291fa6d/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
index a920283..1d7477b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
@@ -560,7 +560,7 @@ public class TFRecordIO {
}
@Override
- public FileBasedWriteOperation<byte[]> createWriteOperation(PipelineOptions options) {
+ public FileBasedWriteOperation<byte[]> createWriteOperation() {
return new TFRecordWriteOperation(this);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/4291fa6d/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index 90dd80f..d161d23 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -1041,8 +1041,7 @@ public class TextIO {
}
@Override
- public FileBasedSink.FileBasedWriteOperation<String> createWriteOperation(
- PipelineOptions options) {
+ public FileBasedSink.FileBasedWriteOperation<String> createWriteOperation() {
return new TextWriteOperation(this, header, footer);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/4291fa6d/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
index 2787820..ba41593 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
@@ -124,7 +124,7 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
WriteFiles.class.getSimpleName());
PipelineOptions options = input.getPipeline().getOptions();
sink.validate(options);
- this.writeOperation = sink.createWriteOperation(options);
+ this.writeOperation = sink.createWriteOperation();
this.writeOperation.setWindowedWrites(windowedWrites);
return createWrite(input);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/4291fa6d/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
index fe65a83..7efe47c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
@@ -531,7 +531,7 @@ public class FileBasedSinkTest {
final String testUid = "testId";
SimpleSink.SimpleWriteOperation writeOp =
new SimpleSink(getBaseOutputFilename(), "txt", new DrunkWritableByteChannelFactory())
- .createWriteOperation(null);
+ .createWriteOperation();
final FileBasedWriter<String> writer =
writeOp.createWriter(null);
final String expectedFilename = IOChannelUtils.resolve(writeOp.tempDirectory.get(), testUid);
http://git-wip-us.apache.org/repos/asf/beam/blob/4291fa6d/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java
index e3cd9b6..8caf004 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java
@@ -39,7 +39,7 @@ class SimpleSink extends FileBasedSink<String> {
}
@Override
- public SimpleWriteOperation createWriteOperation(PipelineOptions options) {
+ public SimpleWriteOperation createWriteOperation() {
return new SimpleWriteOperation(this);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/4291fa6d/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java
index aa9e41e..aee73c4 100644
--- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java
+++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java
@@ -250,7 +250,7 @@ public abstract class HDFSFileSink<T, K, V> extends Sink<T> {
}
@Override
- public Sink.WriteOperation<T, String> createWriteOperation(PipelineOptions options) {
+ public Sink.WriteOperation<T, String> createWriteOperation() {
return new HDFSWriteOperation<>(this, path(), formatClass());
}
http://git-wip-us.apache.org/repos/asf/beam/blob/4291fa6d/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/Sink.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/Sink.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/Sink.java
index 5a3fcd9..fe2db5f 100644
--- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/Sink.java
+++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/Sink.java
@@ -39,7 +39,7 @@ public abstract class Sink<T> implements Serializable, HasDisplayData {
/**
* Returns an instance of a {@link WriteOperation} that can write to this Sink.
*/
- public abstract WriteOperation<T, ?> createWriteOperation(PipelineOptions options);
+ public abstract WriteOperation<T, ?> createWriteOperation();
/**
* {@inheritDoc}
http://git-wip-us.apache.org/repos/asf/beam/blob/4291fa6d/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/Write.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/Write.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/Write.java
index 8c2fc99..03e7c70 100644
--- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/Write.java
+++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/Write.java
@@ -104,7 +104,7 @@ public class Write<T> extends PTransform<PCollection<T>, PDone> {
Write.class.getSimpleName());
PipelineOptions options = input.getPipeline().getOptions();
sink.validate(options);
- return createWrite(input, sink.createWriteOperation(options));
+ return createWrite(input, sink.createWriteOperation());
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/4291fa6d/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSinkTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSinkTest.java b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSinkTest.java
index aeb258f..9fa6606 100644
--- a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSinkTest.java
+++ b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSinkTest.java
@@ -63,7 +63,7 @@ public class HDFSFileSinkTest {
PipelineOptions options,
Iterable<T> toWrite) throws Exception {
Sink.WriteOperation<T, String> writeOperation =
- (Sink.WriteOperation<T, String>) sink.createWriteOperation(options);
+ (Sink.WriteOperation<T, String>) sink.createWriteOperation();
Sink.Writer<T, String> writer = writeOperation.createWriter(options);
writer.openUnwindowed(UUID.randomUUID().toString(), -1, -1);
for (T t: toWrite) {
http://git-wip-us.apache.org/repos/asf/beam/blob/4291fa6d/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java
index a1ebf6c..8a1621e 100644
--- a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java
+++ b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java
@@ -53,7 +53,7 @@ class XmlSink<T> extends FileBasedSink<T> {
* Creates an {@link XmlWriteOperation}.
*/
@Override
- public XmlWriteOperation<T> createWriteOperation(PipelineOptions options) {
+ public XmlWriteOperation<T> createWriteOperation() {
return new XmlWriteOperation<>(this);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/4291fa6d/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSinkTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSinkTest.java b/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSinkTest.java
index bf15cfe..7f9a8c5 100644
--- a/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSinkTest.java
+++ b/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSinkTest.java
@@ -76,7 +76,7 @@ public class XmlSinkTest {
.withRecordClass(Bird.class)
.withRootElement("birds")
.createSink()
- .createWriteOperation(options);
+ .createWriteOperation();
XmlWriter<Bird> writer = writeOp.createWriter(options);
List<Bird> bundle =
@@ -97,7 +97,7 @@ public class XmlSinkTest {
.withRootElement("birds")
.withCharset(StandardCharsets.ISO_8859_1)
.createSink()
- .createWriteOperation(options);
+ .createWriteOperation();
XmlWriter<Bird> writer = writeOp.createWriter(options);
List<Bird> bundle = Lists.newArrayList(new Bird("br�che", "pin�on"));
@@ -155,7 +155,7 @@ public class XmlSinkTest {
.withRootElement(testRootElement)
.toFilenamePrefix(testFilePrefix)
.createSink();
- XmlWriteOperation<Bird> writeOp = sink.createWriteOperation(options);
+ XmlWriteOperation<Bird> writeOp = sink.createWriteOperation();
Path outputPath = new File(testFilePrefix).toPath();
Path tempPath = new File(writeOp.getTemporaryDirectory()).toPath();
assertEquals(outputPath.getParent(), tempPath.getParent());
@@ -175,7 +175,7 @@ public class XmlSinkTest {
.withRootElement(testRootElement)
.toFilenamePrefix(testFilePrefix)
.createSink()
- .createWriteOperation(options);
+ .createWriteOperation();
XmlWriter<Bird> writer = writeOp.createWriter(options);
Path outputPath = new File(testFilePrefix).toPath();
Path tempPath = new File(writer.getWriteOperation().getTemporaryDirectory()).toPath();