You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/04/15 03:02:41 UTC
[1/2] incubator-beam git commit: Closes #181
Repository: incubator-beam
Updated Branches:
refs/heads/master 5bdea1e2b -> 8dc9032ce
Closes #181
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8dc9032c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8dc9032c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8dc9032c
Branch: refs/heads/master
Commit: 8dc9032ce2cf870340dd922bc2643e48e84d1d5f
Parents: 5bdea1e b15424c
Author: Dan Halperin <dh...@google.com>
Authored: Thu Apr 14 18:02:07 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Thu Apr 14 18:02:07 2016 -0700
----------------------------------------------------------------------
.../main/java/org/apache/beam/sdk/io/Write.java | 9 +-
.../java/org/apache/beam/sdk/io/WriteTest.java | 95 ++++++++++++++++----
2 files changed, 80 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
[2/2] incubator-beam git commit: [BEAM-188] Write: apply
GlobalWindows first
Posted by dh...@apache.org.
[BEAM-188] Write: apply GlobalWindows first
And do not supply a timestamp when outputting.
Note that this is safe because the functions in the Writer cannot access the window
or timestamp. When we add per-Window or similar functions to the sinks, we will
likely do so at a higher level.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b15424ce
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b15424ce
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b15424ce
Branch: refs/heads/master
Commit: b15424cede856230d62ad102fa5a586d8f5e10ca
Parents: 5bdea1e
Author: Dan Halperin <dh...@google.com>
Authored: Thu Apr 14 17:23:42 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Thu Apr 14 18:02:07 2016 -0700
----------------------------------------------------------------------
.../main/java/org/apache/beam/sdk/io/Write.java | 9 +-
.../java/org/apache/beam/sdk/io/WriteTest.java | 95 ++++++++++++++++----
2 files changed, 80 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15424ce/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
index a8a2517..67761cd 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
@@ -35,8 +35,6 @@ import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PDone;
-import org.joda.time.Instant;
-
import java.util.UUID;
/**
@@ -154,6 +152,7 @@ public class Write {
// There is a dependency between this ParDo and the first (the WriteOperation PCollection
// as a side input), so this will happen after the initial ParDo.
PCollection<WriteT> results = input
+ .apply(Window.<T>into(new GlobalWindows()))
.apply("WriteBundles", ParDo.of(new DoFn<T, WriteT>() {
// Writer that will write the records in this bundle. Lazily
// initialized in processElement.
@@ -184,13 +183,11 @@ public class Write {
public void finishBundle(Context c) throws Exception {
if (writer != null) {
WriteT result = writer.close();
- // Output the result of the write.
- c.outputWithTimestamp(result, Instant.now());
+ c.output(result);
}
}
}).withSideInputs(writeOperationView))
- .setCoder(writeOperation.getWriterResultCoder())
- .apply(Window.<WriteT>into(new GlobalWindows()));
+ .setCoder(writeOperation.getWriterResultCoder());
final PCollectionView<Iterable<WriteT>> resultsView =
results.apply(View.<WriteT>asIterable());
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15424ce/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
index 865dbe1..af0676a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
@@ -35,8 +35,16 @@ import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.PipelineOptionsFactoryTest.TestPipelineOptions;
import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.Sessions;
import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import com.google.common.base.MoreObjects;
@@ -54,6 +62,7 @@ import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
/**
* Tests for the Write PTransform.
@@ -61,7 +70,47 @@ import java.util.UUID;
@RunWith(JUnit4.class)
public class WriteTest {
// Static store that can be accessed within the writer
- static List<String> sinkContents = new ArrayList<>();
+ private static List<String> sinkContents = new ArrayList<>();
+
+ private static final MapElements<String, String> IDENTITY_MAP =
+ MapElements.via(new SimpleFunction<String, String>() {
+ @Override
+ public String apply(String input) {
+ return input;
+ }
+ });
+
+ private static class WindowAndReshuffle<T> extends PTransform<PCollection<T>, PCollection<T>> {
+ private final Window.Bound<T> window;
+ public WindowAndReshuffle(Window.Bound<T> window) {
+ this.window = window;
+ }
+
+ private static class AddArbitraryKey<T> extends DoFn<T, KV<Integer, T>> {
+ @Override
+ public void processElement(ProcessContext c) throws Exception {
+ c.output(KV.of(ThreadLocalRandom.current().nextInt(), c.element()));
+ }
+ }
+
+ private static class RemoveArbitraryKey<T> extends DoFn<KV<Integer, Iterable<T>>, T> {
+ @Override
+ public void processElement(ProcessContext c) throws Exception {
+ for (T s : c.element().getValue()) {
+ c.output(s);
+ }
+ }
+ }
+
+ @Override
+ public PCollection<T> apply(PCollection<T> input) {
+ return input
+ .apply(window)
+ .apply(ParDo.of(new AddArbitraryKey<T>()))
+ .apply(GroupByKey.<Integer, T>create())
+ .apply(ParDo.of(new RemoveArbitraryKey<T>()));
+ }
+ }
/**
* Test a Write transform with a PCollection of elements.
@@ -70,7 +119,7 @@ public class WriteTest {
public void testWrite() {
List<String> inputs = Arrays.asList("Critical canary", "Apprehensive eagle",
"Intimidating pigeon", "Pedantic gull", "Frisky finch");
- runWrite(inputs, /* not windowed */ false);
+ runWrite(inputs, IDENTITY_MAP);
}
/**
@@ -79,7 +128,7 @@ public class WriteTest {
@Test
public void testWriteWithEmptyPCollection() {
List<String> inputs = new ArrayList<>();
- runWrite(inputs, /* not windowed */ false);
+ runWrite(inputs, IDENTITY_MAP);
}
/**
@@ -89,7 +138,21 @@ public class WriteTest {
public void testWriteWindowed() {
List<String> inputs = Arrays.asList("Critical canary", "Apprehensive eagle",
"Intimidating pigeon", "Pedantic gull", "Frisky finch");
- runWrite(inputs, /* windowed */ true);
+ runWrite(
+ inputs, new WindowAndReshuffle(Window.<String>into(FixedWindows.of(Duration.millis(2)))));
+ }
+
+ /**
+ * Test a Write with sessions.
+ */
+ @Test
+ public void testWriteWithSessions() {
+ List<String> inputs = Arrays.asList("Critical canary", "Apprehensive eagle",
+ "Intimidating pigeon", "Pedantic gull", "Frisky finch");
+
+ runWrite(
+ inputs,
+ new WindowAndReshuffle(Window.<String>into(Sessions.withGapDuration(Duration.millis(1)))));
}
/**
@@ -97,7 +160,8 @@ public class WriteTest {
* a test sink in the correct order, as well as verifies that the elements of a PCollection are
* written to the sink.
*/
- public void runWrite(List<String> inputs, boolean windowed) {
+ private static void runWrite(
+ List<String> inputs, PTransform<PCollection<String>, PCollection<String>> transform) {
// Flag to validate that the pipeline options are passed to the Sink
String[] args = {"--testFlag=test_value"};
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).as(WriteOptions.class);
@@ -106,21 +170,16 @@ public class WriteTest {
// Clear the sink's contents.
sinkContents.clear();
- // Construct the input PCollection and test Sink.
- PCollection<String> input;
- if (windowed) {
- List<Long> timestamps = new ArrayList<>();
- for (long i = 0; i < inputs.size(); i++) {
- timestamps.add(i + 1);
- }
- input = p.apply(Create.timestamped(inputs, timestamps).withCoder(StringUtf8Coder.of()))
- .apply(Window.<String>into(FixedWindows.of(new Duration(2))));
- } else {
- input = p.apply(Create.of(inputs).withCoder(StringUtf8Coder.of()));
+ // Prepare timestamps for the elements.
+ List<Long> timestamps = new ArrayList<>();
+ for (long i = 0; i < inputs.size(); i++) {
+ timestamps.add(i + 1);
}
- TestSink sink = new TestSink();
- input.apply(Write.to(sink));
+ TestSink sink = new TestSink();
+ p.apply(Create.timestamped(inputs, timestamps).withCoder(StringUtf8Coder.of()))
+ .apply(transform)
+ .apply(Write.to(sink));
p.run();
assertThat(sinkContents, containsInAnyOrder(inputs.toArray()));