You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by th...@apache.org on 2016/11/07 19:58:53 UTC
[03/50] incubator-beam git commit: [BEAM-854] Removes
ReifyTimestampsAndWindows
[BEAM-854] Removes ReifyTimestampsAndWindows
It was used effectively in 2 places:
- GatherAllPanes (used in PAssert):
replaced with a similar custom DoFn.
- A couple of GBK-related transforms:
folded into GroupByKeyOnly.
Also makes GBKO produce globally-windowed KWIs.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e77d881a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e77d881a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e77d881a
Branch: refs/heads/apex-runner
Commit: e77d881aecc3f5fdbc407a252f9924c1226bcb4a
Parents: 529f266
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Oct 28 10:40:17 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Wed Nov 2 10:18:59 2016 -0700
----------------------------------------------------------------------
.../core/GroupByKeyViaGroupByKeyOnly.java | 13 ++--
...ectGBKIntoKeyedWorkItemsOverrideFactory.java | 2 -
.../beam/runners/direct/DirectGroupByKey.java | 42 ++++++-------
.../GroupAlsoByWindowEvaluatorFactory.java | 4 +-
.../direct/GroupByKeyOnlyEvaluatorFactory.java | 31 +++++-----
.../beam/runners/direct/ParDoEvaluator.java | 5 +-
.../direct/ParDoMultiEvaluatorHooks.java | 1 +
.../direct/ParDoSingleEvaluatorHooks.java | 1 +
.../direct/GroupByKeyEvaluatorFactoryTest.java | 29 ++++-----
.../GroupByKeyOnlyEvaluatorFactoryTest.java | 31 +++++-----
.../beam/runners/direct/ParDoEvaluatorTest.java | 5 +-
.../apache/beam/sdk/util/GatherAllPanes.java | 18 +++++-
.../sdk/util/ReifyTimestampAndWindowsDoFn.java | 41 -------------
.../sdk/util/ReifyTimestampsAndWindows.java | 63 --------------------
14 files changed, 94 insertions(+), 192 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e77d881a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupByKeyViaGroupByKeyOnly.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupByKeyViaGroupByKeyOnly.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupByKeyViaGroupByKeyOnly.java
index b521425..79d2252 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupByKeyViaGroupByKeyOnly.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupByKeyViaGroupByKeyOnly.java
@@ -31,7 +31,6 @@ import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.ReifyTimestampsAndWindows;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
import org.apache.beam.sdk.util.WindowingStrategy;
@@ -84,15 +83,11 @@ public class GroupByKeyViaGroupByKeyOnly<K, V>
WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
return input
- // Make each input element's timestamp and assigned windows
- // explicit, in the value part.
- .apply(new ReifyTimestampsAndWindows<K, V>())
-
// Group by just the key.
// Combiner lifting will not happen regardless of the disallowCombinerLifting value.
// There will be no combiners right after the GroupByKeyOnly because of the two ParDos
// introduced in here.
- .apply(new GroupByKeyOnly<K, WindowedValue<V>>())
+ .apply(new GroupByKeyOnly<K, V>())
// Sort each key's values by timestamp. GroupAlsoByWindow requires
// its input to be sorted by timestamp.
@@ -112,12 +107,12 @@ public class GroupByKeyViaGroupByKeyOnly<K, V>
* or evaluate this class.
*/
public static class GroupByKeyOnly<K, V>
- extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> {
+ extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<WindowedValue<V>>>>> {
@SuppressWarnings({"rawtypes", "unchecked"})
@Override
- public PCollection<KV<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) {
- return PCollection.<KV<K, Iterable<V>>>createPrimitiveOutputInternal(
+ public PCollection<KV<K, Iterable<WindowedValue<V>>>> apply(PCollection<KV<K, V>> input) {
+ return PCollection.createPrimitiveOutputInternal(
input.getPipeline(), input.getWindowingStrategy(), input.isBounded());
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e77d881a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java
index b63e23b..680a971 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java
@@ -24,7 +24,6 @@ import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.util.KeyedWorkItem;
import org.apache.beam.sdk.util.KeyedWorkItemCoder;
-import org.apache.beam.sdk.util.ReifyTimestampsAndWindows;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
@@ -52,7 +51,6 @@ class DirectGBKIntoKeyedWorkItemsOverrideFactory<KeyT, InputT>
checkArgument(input.getCoder() instanceof KvCoder);
KvCoder<KeyT, InputT> kvCoder = (KvCoder<KeyT, InputT>) input.getCoder();
return input
- .apply(new ReifyTimestampsAndWindows<KeyT, InputT>())
// TODO: Perhaps windowing strategy should instead be set by ReifyTAW, or by DGBKO
.setWindowingStrategyInternal(WindowingStrategy.globalDefault())
.apply(new DirectGroupByKey.DirectGroupByKeyOnly<KeyT, InputT>())
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e77d881a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
index 14103a6..219314a 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
@@ -27,8 +27,6 @@ import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.util.KeyedWorkItem;
import org.apache.beam.sdk.util.KeyedWorkItemCoder;
-import org.apache.beam.sdk.util.ReifyTimestampsAndWindows;
-import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
@@ -55,13 +53,13 @@ class DirectGroupByKey<K, V>
// merging windows as needed, using the windows assigned to the
// key/value input elements and the window merge operation of the
// window function associated with the input PCollection.
- WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
+ WindowingStrategy<?, ?> inputWindowingStrategy = input.getWindowingStrategy();
+ // Update the windowing strategy as appropriate.
+ WindowingStrategy<?, ?> outputWindowingStrategy =
+ original.updateWindowingStrategy(inputWindowingStrategy);
// By default, implement GroupByKey via a series of lower-level operations.
return input
- // Make each input element's timestamp and assigned windows
- // explicit, in the value part.
- .apply(new ReifyTimestampsAndWindows<K, V>())
.apply(new DirectGroupByKeyOnly<K, V>())
.setCoder(
KeyedWorkItemCoder.of(
@@ -70,20 +68,20 @@ class DirectGroupByKey<K, V>
input.getWindowingStrategy().getWindowFn().windowCoder()))
// Group each key's values by window, merging windows as needed.
- .apply("GroupAlsoByWindow", new DirectGroupAlsoByWindow<K, V>(windowingStrategy))
+ .apply(
+ "GroupAlsoByWindow",
+ new DirectGroupAlsoByWindow<K, V>(inputWindowingStrategy, outputWindowingStrategy))
- // And update the windowing strategy as appropriate.
- .setWindowingStrategyInternal(original.updateWindowingStrategy(windowingStrategy))
.setCoder(
KvCoder.of(inputCoder.getKeyCoder(), IterableCoder.of(inputCoder.getValueCoder())));
}
static final class DirectGroupByKeyOnly<K, V>
- extends PTransform<PCollection<KV<K, WindowedValue<V>>>, PCollection<KeyedWorkItem<K, V>>> {
+ extends PTransform<PCollection<KV<K, V>>, PCollection<KeyedWorkItem<K, V>>> {
@Override
- public PCollection<KeyedWorkItem<K, V>> apply(PCollection<KV<K, WindowedValue<V>>> input) {
- return PCollection.<KeyedWorkItem<K, V>>createPrimitiveOutputInternal(
- input.getPipeline(), input.getWindowingStrategy(), input.isBounded());
+ public PCollection<KeyedWorkItem<K, V>> apply(PCollection<KV<K, V>> input) {
+ return PCollection.createPrimitiveOutputInternal(
+ input.getPipeline(), WindowingStrategy.globalDefault(), input.isBounded());
}
DirectGroupByKeyOnly() {}
@@ -92,14 +90,18 @@ class DirectGroupByKey<K, V>
static final class DirectGroupAlsoByWindow<K, V>
extends PTransform<PCollection<KeyedWorkItem<K, V>>, PCollection<KV<K, Iterable<V>>>> {
- private final WindowingStrategy<?, ?> windowingStrategy;
+ private final WindowingStrategy<?, ?> inputWindowingStrategy;
+ private final WindowingStrategy<?, ?> outputWindowingStrategy;
- public DirectGroupAlsoByWindow(WindowingStrategy<?, ?> windowingStrategy) {
- this.windowingStrategy = windowingStrategy;
+ public DirectGroupAlsoByWindow(
+ WindowingStrategy<?, ?> inputWindowingStrategy,
+ WindowingStrategy<?, ?> outputWindowingStrategy) {
+ this.inputWindowingStrategy = inputWindowingStrategy;
+ this.outputWindowingStrategy = outputWindowingStrategy;
}
- public WindowingStrategy<?, ?> getWindowingStrategy() {
- return windowingStrategy;
+ public WindowingStrategy<?, ?> getInputWindowingStrategy() {
+ return inputWindowingStrategy;
}
private KeyedWorkItemCoder<K, V> getKeyedWorkItemCoder(Coder<KeyedWorkItem<K, V>> inputCoder) {
@@ -125,8 +127,8 @@ class DirectGroupByKey<K, V>
@Override
public PCollection<KV<K, Iterable<V>>> apply(PCollection<KeyedWorkItem<K, V>> input) {
- return PCollection.<KV<K, Iterable<V>>>createPrimitiveOutputInternal(
- input.getPipeline(), input.getWindowingStrategy(), input.isBounded());
+ return PCollection.createPrimitiveOutputInternal(
+ input.getPipeline(), outputWindowingStrategy, input.isBounded());
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e77d881a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
index 4115bb7..37cc319 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
@@ -103,7 +103,8 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory {
@SuppressWarnings("unchecked")
WindowingStrategy<?, BoundedWindow> windowingStrategy =
- (WindowingStrategy<?, BoundedWindow>) application.getTransform().getWindowingStrategy();
+ (WindowingStrategy<?, BoundedWindow>)
+ application.getTransform().getInputWindowingStrategy();
DirectStepContext stepContext =
evaluationContext
@@ -125,6 +126,7 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory {
evaluationContext,
stepContext,
application,
+ windowingStrategy,
gabwDoFn,
Collections.<PCollectionView<?>>emptyList(),
MAIN_OUTPUT_TAG,
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e77d881a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java
index 2ead782..0fa7ebd 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java
@@ -67,14 +67,14 @@ class GroupByKeyOnlyEvaluatorFactory implements TransformEvaluatorFactory {
@Override
public void cleanup() {}
- private <K, V> TransformEvaluator<KV<K, WindowedValue<V>>> createEvaluator(
+ private <K, V> TransformEvaluator<KV<K, V>> createEvaluator(
final AppliedPTransform<
- PCollection<KV<K, WindowedValue<V>>>,
+ PCollection<KV<K, V>>,
PCollection<KeyedWorkItem<K, V>>,
DirectGroupByKeyOnly<K, V>>
application,
- final CommittedBundle<KV<K, WindowedValue<V>>> inputBundle) {
- return new GroupByKeyOnlyEvaluator<>(evaluationContext, inputBundle, application);
+ final CommittedBundle<KV<K, V>> inputBundle) {
+ return new GroupByKeyOnlyEvaluator<>(evaluationContext, application);
}
/**
@@ -84,12 +84,11 @@ class GroupByKeyOnlyEvaluatorFactory implements TransformEvaluatorFactory {
* @see GroupByKeyViaGroupByKeyOnly
*/
private static class GroupByKeyOnlyEvaluator<K, V>
- implements TransformEvaluator<KV<K, WindowedValue<V>>> {
+ implements TransformEvaluator<KV<K, V>> {
private final EvaluationContext evaluationContext;
- private final CommittedBundle<KV<K, WindowedValue<V>>> inputBundle;
private final AppliedPTransform<
- PCollection<KV<K, WindowedValue<V>>>,
+ PCollection<KV<K, V>>,
PCollection<KeyedWorkItem<K, V>>,
DirectGroupByKeyOnly<K, V>> application;
private final Coder<K> keyCoder;
@@ -97,19 +96,17 @@ class GroupByKeyOnlyEvaluatorFactory implements TransformEvaluatorFactory {
public GroupByKeyOnlyEvaluator(
EvaluationContext evaluationContext,
- CommittedBundle<KV<K, WindowedValue<V>>> inputBundle,
AppliedPTransform<
- PCollection<KV<K, WindowedValue<V>>>,
- PCollection<KeyedWorkItem<K, V>>,
+ PCollection<KV<K, V>>,
+ PCollection<KeyedWorkItem<K, V>>,
DirectGroupByKeyOnly<K, V>> application) {
this.evaluationContext = evaluationContext;
- this.inputBundle = inputBundle;
this.application = application;
this.keyCoder = getKeyCoder(application.getInput().getCoder());
this.groupingMap = new HashMap<>();
}
- private Coder<K> getKeyCoder(Coder<KV<K, WindowedValue<V>>> coder) {
+ private Coder<K> getKeyCoder(Coder<KV<K, V>> coder) {
checkState(
coder instanceof KvCoder,
"%s requires a coder of class %s."
@@ -118,13 +115,13 @@ class GroupByKeyOnlyEvaluatorFactory implements TransformEvaluatorFactory {
getClass().getSimpleName(),
KvCoder.class.getSimpleName());
@SuppressWarnings("unchecked")
- Coder<K> keyCoder = ((KvCoder<K, WindowedValue<V>>) coder).getKeyCoder();
+ Coder<K> keyCoder = ((KvCoder<K, V>) coder).getKeyCoder();
return keyCoder;
}
@Override
- public void processElement(WindowedValue<KV<K, WindowedValue<V>>> element) {
- KV<K, WindowedValue<V>> kv = element.getValue();
+ public void processElement(WindowedValue<KV<K, V>> element) {
+ KV<K, V> kv = element.getValue();
K key = kv.getKey();
byte[] encodedKey;
try {
@@ -139,10 +136,10 @@ class GroupByKeyOnlyEvaluatorFactory implements TransformEvaluatorFactory {
GroupingKey<K> groupingKey = new GroupingKey<>(key, encodedKey);
List<WindowedValue<V>> values = groupingMap.get(groupingKey);
if (values == null) {
- values = new ArrayList<WindowedValue<V>>();
+ values = new ArrayList<>();
groupingMap.put(groupingKey, values);
}
- values.add(kv.getValue());
+ values.add(element.withValue(kv.getValue()));
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e77d881a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
index 5913379..6f91319 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
@@ -31,9 +31,11 @@ import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext;
import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
@@ -44,6 +46,7 @@ class ParDoEvaluator<InputT, OutputT> implements TransformEvaluator<InputT> {
EvaluationContext evaluationContext,
DirectStepContext stepContext,
AppliedPTransform<PCollection<InputT>, ?, ?> application,
+ WindowingStrategy<?, ? extends BoundedWindow> windowingStrategy,
Serializable fn, // may be OldDoFn or DoFn
List<PCollectionView<?>> sideInputs,
TupleTag<OutputT> mainOutputTag,
@@ -70,7 +73,7 @@ class ParDoEvaluator<InputT, OutputT> implements TransformEvaluator<InputT> {
sideOutputTags,
stepContext,
aggregatorChanges,
- application.getInput().getWindowingStrategy());
+ windowingStrategy);
PushbackSideInputDoFnRunner<InputT, OutputT> runner =
PushbackSideInputDoFnRunner.create(underlying, sideInputs, sideInputReader);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e77d881a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorHooks.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorHooks.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorHooks.java
index a566154..f30f209 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorHooks.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorHooks.java
@@ -45,6 +45,7 @@ class ParDoMultiEvaluatorHooks<InputT, OutputT>
evaluationContext,
stepContext,
application,
+ application.getInput().getWindowingStrategy(),
fnLocal,
transform.getSideInputs(),
transform.getMainOutputTag(),
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e77d881a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorHooks.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorHooks.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorHooks.java
index b554f41..6d284c2 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorHooks.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorHooks.java
@@ -48,6 +48,7 @@ class ParDoSingleEvaluatorHooks<InputT, OutputT>
evaluationContext,
stepContext,
application,
+ application.getInput().getWindowingStrategy(),
fnLocal,
transform.getSideInputs(),
mainOutputTag,
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e77d881a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java
index 49d7d90..a726817 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java
@@ -35,7 +35,6 @@ import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.util.KeyedWorkItem;
import org.apache.beam.sdk.util.KeyedWorkItems;
-import org.apache.beam.sdk.util.ReifyTimestampsAndWindows;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
@@ -64,13 +63,11 @@ public class GroupByKeyEvaluatorFactoryTest {
KV<String, Integer> firstBaz = KV.of("baz", Integer.MAX_VALUE);
PCollection<KV<String, Integer>> values =
p.apply(Create.of(firstFoo, firstBar, secondFoo, firstBaz, secondBar, thirdFoo));
- PCollection<KV<String, WindowedValue<Integer>>> kvs =
- values.apply(new ReifyTimestampsAndWindows<String, Integer>());
PCollection<KeyedWorkItem<String, Integer>> groupedKvs =
- kvs.apply(new DirectGroupByKeyOnly<String, Integer>());
+ values.apply(new DirectGroupByKeyOnly<String, Integer>());
- CommittedBundle<KV<String, WindowedValue<Integer>>> inputBundle =
- bundleFactory.createBundle(kvs).commit(Instant.now());
+ CommittedBundle<KV<String, Integer>> inputBundle =
+ bundleFactory.createBundle(values).commit(Instant.now());
EvaluationContext evaluationContext = mock(EvaluationContext.class);
StructuralKey<String> fooKey = StructuralKey.of("foo", StringUtf8Coder.of());
UncommittedBundle<KeyedWorkItem<String, Integer>> fooBundle =
@@ -97,17 +94,17 @@ public class GroupByKeyEvaluatorFactoryTest {
// The input to a GroupByKey is assumed to be a KvCoder
@SuppressWarnings("unchecked")
Coder<String> keyCoder =
- ((KvCoder<String, WindowedValue<Integer>>) kvs.getCoder()).getKeyCoder();
- TransformEvaluator<KV<String, WindowedValue<Integer>>> evaluator =
+ ((KvCoder<String, Integer>) values.getCoder()).getKeyCoder();
+ TransformEvaluator<KV<String, Integer>> evaluator =
new GroupByKeyOnlyEvaluatorFactory(evaluationContext)
.forApplication(groupedKvs.getProducingTransformInternal(), inputBundle);
- evaluator.processElement(WindowedValue.valueInEmptyWindows(gwValue(firstFoo)));
- evaluator.processElement(WindowedValue.valueInEmptyWindows(gwValue(secondFoo)));
- evaluator.processElement(WindowedValue.valueInEmptyWindows(gwValue(thirdFoo)));
- evaluator.processElement(WindowedValue.valueInEmptyWindows(gwValue(firstBar)));
- evaluator.processElement(WindowedValue.valueInEmptyWindows(gwValue(secondBar)));
- evaluator.processElement(WindowedValue.valueInEmptyWindows(gwValue(firstBaz)));
+ evaluator.processElement(WindowedValue.valueInGlobalWindow(firstFoo));
+ evaluator.processElement(WindowedValue.valueInGlobalWindow(secondFoo));
+ evaluator.processElement(WindowedValue.valueInGlobalWindow(thirdFoo));
+ evaluator.processElement(WindowedValue.valueInGlobalWindow(firstBar));
+ evaluator.processElement(WindowedValue.valueInGlobalWindow(secondBar));
+ evaluator.processElement(WindowedValue.valueInGlobalWindow(firstBaz));
evaluator.finishBundle();
@@ -142,10 +139,6 @@ public class GroupByKeyEvaluatorFactoryTest {
keyCoder)));
}
- private <K, V> KV<K, WindowedValue<V>> gwValue(KV<K, V> kv) {
- return KV.of(kv.getKey(), WindowedValue.valueInGlobalWindow(kv.getValue()));
- }
-
private static class KeyedWorkItemMatcher<K, V>
extends BaseMatcher<WindowedValue<KeyedWorkItem<K, V>>> {
private final KeyedWorkItem<K, V> myWorkItem;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e77d881a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactoryTest.java
index 3b9dc39..3e5af14 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactoryTest.java
@@ -35,7 +35,6 @@ import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.util.KeyedWorkItem;
import org.apache.beam.sdk.util.KeyedWorkItems;
-import org.apache.beam.sdk.util.ReifyTimestampsAndWindows;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
@@ -64,13 +63,11 @@ public class GroupByKeyOnlyEvaluatorFactoryTest {
KV<String, Integer> firstBaz = KV.of("baz", Integer.MAX_VALUE);
PCollection<KV<String, Integer>> values =
p.apply(Create.of(firstFoo, firstBar, secondFoo, firstBaz, secondBar, thirdFoo));
- PCollection<KV<String, WindowedValue<Integer>>> kvs =
- values.apply(new ReifyTimestampsAndWindows<String, Integer>());
PCollection<KeyedWorkItem<String, Integer>> groupedKvs =
- kvs.apply(new DirectGroupByKeyOnly<String, Integer>());
+ values.apply(new DirectGroupByKeyOnly<String, Integer>());
- CommittedBundle<KV<String, WindowedValue<Integer>>> inputBundle =
- bundleFactory.createBundle(kvs).commit(Instant.now());
+ CommittedBundle<KV<String, Integer>> inputBundle =
+ bundleFactory.createBundle(values).commit(Instant.now());
EvaluationContext evaluationContext = mock(EvaluationContext.class);
StructuralKey<String> fooKey = StructuralKey.of("foo", StringUtf8Coder.of());
@@ -90,25 +87,25 @@ public class GroupByKeyOnlyEvaluatorFactoryTest {
// The input to a GroupByKey is assumed to be a KvCoder
@SuppressWarnings("unchecked")
Coder<String> keyCoder =
- ((KvCoder<String, WindowedValue<Integer>>) kvs.getCoder()).getKeyCoder();
- TransformEvaluator<KV<String, WindowedValue<Integer>>> evaluator =
+ ((KvCoder<String, Integer>) values.getCoder()).getKeyCoder();
+ TransformEvaluator<KV<String, Integer>> evaluator =
new GroupByKeyOnlyEvaluatorFactory(evaluationContext)
.forApplication(
groupedKvs.getProducingTransformInternal(), inputBundle);
- evaluator.processElement(WindowedValue.valueInEmptyWindows(gwValue(firstFoo)));
- evaluator.processElement(WindowedValue.valueInEmptyWindows(gwValue(secondFoo)));
- evaluator.processElement(WindowedValue.valueInEmptyWindows(gwValue(thirdFoo)));
- evaluator.processElement(WindowedValue.valueInEmptyWindows(gwValue(firstBar)));
- evaluator.processElement(WindowedValue.valueInEmptyWindows(gwValue(secondBar)));
- evaluator.processElement(WindowedValue.valueInEmptyWindows(gwValue(firstBaz)));
+ evaluator.processElement(WindowedValue.valueInGlobalWindow(firstFoo));
+ evaluator.processElement(WindowedValue.valueInGlobalWindow(secondFoo));
+ evaluator.processElement(WindowedValue.valueInGlobalWindow(thirdFoo));
+ evaluator.processElement(WindowedValue.valueInGlobalWindow(firstBar));
+ evaluator.processElement(WindowedValue.valueInGlobalWindow(secondBar));
+ evaluator.processElement(WindowedValue.valueInGlobalWindow(firstBaz));
evaluator.finishBundle();
assertThat(
fooBundle.commit(Instant.now()).getElements(),
contains(
- new KeyedWorkItemMatcher<String, Integer>(
+ new KeyedWorkItemMatcher<>(
KeyedWorkItems.elementsWorkItem(
"foo",
ImmutableSet.of(
@@ -119,7 +116,7 @@ public class GroupByKeyOnlyEvaluatorFactoryTest {
assertThat(
barBundle.commit(Instant.now()).getElements(),
contains(
- new KeyedWorkItemMatcher<String, Integer>(
+ new KeyedWorkItemMatcher<>(
KeyedWorkItems.elementsWorkItem(
"bar",
ImmutableSet.of(
@@ -129,7 +126,7 @@ public class GroupByKeyOnlyEvaluatorFactoryTest {
assertThat(
bazBundle.commit(Instant.now()).getElements(),
contains(
- new KeyedWorkItemMatcher<String, Integer>(
+ new KeyedWorkItemMatcher<>(
KeyedWorkItems.elementsWorkItem(
"baz",
ImmutableSet.of(WindowedValue.valueInGlobalWindow(Integer.MAX_VALUE))),
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e77d881a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
index 8254413..eab92f4 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
@@ -152,10 +152,13 @@ public class ParDoEvaluatorTest {
when(evaluationContext.getAggregatorContainer()).thenReturn(container);
when(evaluationContext.getAggregatorMutator()).thenReturn(mutator);
+ AppliedPTransform<PCollection<Integer>, ?, ?> transform =
+ (AppliedPTransform<PCollection<Integer>, ?, ?>) output.getProducingTransformInternal();
return ParDoEvaluator.create(
evaluationContext,
stepContext,
- (AppliedPTransform<PCollection<Integer>, ?, ?>) output.getProducingTransformInternal(),
+ transform,
+ transform.getInput().getWindowingStrategy(),
fn,
ImmutableList.<PCollectionView<?>>of(singletonView),
mainOutputTag,
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e77d881a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GatherAllPanes.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GatherAllPanes.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GatherAllPanes.java
index a2a6e17..52a2ba8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GatherAllPanes.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GatherAllPanes.java
@@ -17,10 +17,13 @@
*/
package org.apache.beam.sdk.util;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.Never;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
@@ -55,8 +58,12 @@ public class GatherAllPanes<T>
WindowFn<?, ?> originalWindowFn = input.getWindowingStrategy().getWindowFn();
return input
- .apply(WithKeys.<Integer, T>of(0).withKeyType(new TypeDescriptor<Integer>() {}))
- .apply(new ReifyTimestampsAndWindows<Integer, T>())
+ .apply(ParDo.of(new ReifyTimestampsAndWindowsFn<T>()))
+ .setCoder(
+ WindowedValue.FullWindowedValueCoder.of(
+ input.getCoder(), input.getWindowingStrategy().getWindowFn().windowCoder()))
+ .apply(
+ WithKeys.<Integer, WindowedValue<T>>of(0).withKeyType(new TypeDescriptor<Integer>() {}))
.apply(
Window.into(
new IdentityWindowFn<KV<Integer, WindowedValue<T>>>(
@@ -69,4 +76,11 @@ public class GatherAllPanes<T>
.apply(Values.<Iterable<WindowedValue<T>>>create())
.setWindowingStrategyInternal(input.getWindowingStrategy());
}
+
+ private static class ReifyTimestampsAndWindowsFn<T> extends DoFn<T, WindowedValue<T>> {
+ @DoFn.ProcessElement
+ public void processElement(ProcessContext c, BoundedWindow window) {
+ c.output(WindowedValue.of(c.element(), c.timestamp(), window, c.pane()));
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e77d881a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReifyTimestampAndWindowsDoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReifyTimestampAndWindowsDoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReifyTimestampAndWindowsDoFn.java
deleted file mode 100644
index 6da4da0..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReifyTimestampAndWindowsDoFn.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import org.apache.beam.sdk.transforms.OldDoFn;
-import org.apache.beam.sdk.transforms.OldDoFn.RequiresWindowAccess;
-import org.apache.beam.sdk.values.KV;
-
-/**
- * {@link OldDoFn} that makes timestamps and window assignments explicit in the value part of each
- * key/value pair.
- *
- * @param <K> the type of the keys of the input and output {@code PCollection}s
- * @param <V> the type of the values of the input {@code PCollection}
- */
-@SystemDoFnInternal
-public class ReifyTimestampAndWindowsDoFn<K, V> extends OldDoFn<KV<K, V>, KV<K, WindowedValue<V>>>
- implements RequiresWindowAccess {
- @Override
- public void processElement(ProcessContext c) throws Exception {
- KV<K, V> kv = c.element();
- K key = kv.getKey();
- V value = kv.getValue();
- c.output(KV.of(key, WindowedValue.of(value, c.timestamp(), c.window(), c.pane())));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e77d881a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReifyTimestampsAndWindows.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReifyTimestampsAndWindows.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReifyTimestampsAndWindows.java
deleted file mode 100644
index d129c8e..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReifyTimestampsAndWindows.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.util;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-
-/**
- * Helper transform that makes timestamps and window assignments explicit in the value part of
- * each key/value pair.
- */
-public class ReifyTimestampsAndWindows<K, V>
- extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, WindowedValue<V>>>> {
-
- @Override
- public PCollection<KV<K, WindowedValue<V>>> apply(PCollection<KV<K, V>> input) {
-
- // The requirement to use a KvCoder *is* actually a model-level requirement, not specific
- // to this implementation of GBK. All runners need a way to get the key.
- checkArgument(
- input.getCoder() instanceof KvCoder,
- "%s requires its input to use a %s",
- GroupByKey.class.getSimpleName(),
- KvCoder.class.getSimpleName());
-
- @SuppressWarnings("unchecked")
- KvCoder<K, V> inputKvCoder = (KvCoder<K, V>) input.getCoder();
- Coder<K> keyCoder = inputKvCoder.getKeyCoder();
- Coder<V> inputValueCoder = inputKvCoder.getValueCoder();
- Coder<WindowedValue<V>> outputValueCoder =
- FullWindowedValueCoder.of(
- inputValueCoder, input.getWindowingStrategy().getWindowFn().windowCoder());
- Coder<KV<K, WindowedValue<V>>> outputKvCoder = KvCoder.of(keyCoder, outputValueCoder);
- return input
- .apply(ParDo.of(new ReifyTimestampAndWindowsDoFn<K, V>()))
- .setCoder(outputKvCoder);
- }
-}