You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/12/05 22:55:20 UTC
[2/3] incubator-beam git commit: Moved KeyedWorkItem and related
classes to runners-core
Moved KeyedWorkItem and related classes to runners-core
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8a2f020f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8a2f020f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8a2f020f
Branch: refs/heads/master
Commit: 8a2f020f9781340e60609a5a8ec537871ae29570
Parents: 81d1295
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Nov 30 20:46:04 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Dec 5 14:55:02 2016 -0800
----------------------------------------------------------------------
.../operators/ApexGroupByKeyOperator.java | 4 +-
.../apache/beam/runners/core/DoFnRunner.java | 1 -
.../apache/beam/runners/core/DoFnRunners.java | 1 -
.../core/GroupAlsoByWindowViaWindowSetDoFn.java | 1 -
.../apache/beam/runners/core/KeyedWorkItem.java | 44 +++++++
.../beam/runners/core/KeyedWorkItemCoder.java | 130 +++++++++++++++++++
.../beam/runners/core/KeyedWorkItems.java | 122 +++++++++++++++++
.../core/LateDataDroppingDoFnRunner.java | 2 -
.../beam/runners/core/SplittableParDo.java | 2 -
.../runners/core/KeyedWorkItemCoderTest.java | 64 +++++++++
.../beam/runners/core/SplittableParDoTest.java | 2 -
...ectGBKIntoKeyedWorkItemsOverrideFactory.java | 2 +-
.../beam/runners/direct/DirectGroupByKey.java | 4 +-
.../direct/ExecutorServiceParallelExecutor.java | 4 +-
.../GroupAlsoByWindowEvaluatorFactory.java | 2 +-
.../direct/GroupByKeyOnlyEvaluatorFactory.java | 4 +-
...littableProcessElementsEvaluatorFactory.java | 2 +-
.../direct/GroupByKeyEvaluatorFactoryTest.java | 4 +-
.../GroupByKeyOnlyEvaluatorFactoryTest.java | 4 +-
.../streaming/SingletonKeyedWorkItem.java | 2 +-
.../streaming/SingletonKeyedWorkItemCoder.java | 4 +-
.../wrappers/streaming/WindowDoFnOperator.java | 6 +-
.../wrappers/streaming/WorkItemKeySelector.java | 2 +-
.../org/apache/beam/sdk/util/KeyedWorkItem.java | 43 ------
.../beam/sdk/util/KeyedWorkItemCoder.java | 128 ------------------
.../apache/beam/sdk/util/KeyedWorkItems.java | 121 -----------------
.../beam/sdk/util/KeyedWorkItemCoderTest.java | 62 ---------
27 files changed, 381 insertions(+), 386 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8a2f020f/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
index f49c785..48ac177 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
@@ -42,6 +42,8 @@ import org.apache.beam.runners.apex.ApexPipelineOptions;
import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple;
import org.apache.beam.runners.apex.translation.utils.SerializablePipelineOptions;
import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetDoFn;
+import org.apache.beam.runners.core.KeyedWorkItem;
+import org.apache.beam.runners.core.KeyedWorkItems;
import org.apache.beam.runners.core.SystemReduceFn;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
@@ -54,8 +56,6 @@ import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.CoderUtils;
-import org.apache.beam.sdk.util.KeyedWorkItem;
-import org.apache.beam.sdk.util.KeyedWorkItems;
import org.apache.beam.sdk.util.TimeDomain;
import org.apache.beam.sdk.util.TimerInternals;
import org.apache.beam.sdk.util.WindowedValue;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8a2f020f/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java
index c84122b..aac8e8f 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java
@@ -20,7 +20,6 @@ package org.apache.beam.runners.core;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.OldDoFn.ProcessContext;
-import org.apache.beam.sdk.util.KeyedWorkItem;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8a2f020f/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
index 3840423..da16573 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
@@ -27,7 +27,6 @@ import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.ExecutionContext.StepContext;
-import org.apache.beam.sdk.util.KeyedWorkItem;
import org.apache.beam.sdk.util.SideInputReader;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8a2f020f/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
index 8b10813..2082269 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
@@ -24,7 +24,6 @@ import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.KeyedWorkItem;
import org.apache.beam.sdk.util.SystemDoFnInternal;
import org.apache.beam.sdk.util.TimerInternals;
import org.apache.beam.sdk.util.TimerInternals.TimerData;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8a2f020f/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItem.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItem.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItem.java
new file mode 100644
index 0000000..c75fc25
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItem.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import org.apache.beam.sdk.util.TimerInternals.TimerData;
+import org.apache.beam.sdk.util.WindowedValue;
+
+/**
+ * Interface that contains all the timers and elements associated with a specific work item.
+ *
+ * @param <K> the key type
+ * @param <ElemT> the element type
+ */
+public interface KeyedWorkItem<K, ElemT> {
+ /**
+ * Returns the key.
+ */
+ K key();
+
+ /**
+ * Returns an iterable containing the timers.
+ */
+ Iterable<TimerData> timersIterable();
+
+ /**
+ * Returns an iterable containing the elements.
+ */
+ Iterable<WindowedValue<ElemT>> elementsIterable();
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8a2f020f/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java
new file mode 100644
index 0000000..95be047
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.ImmutableList;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.List;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.coders.StandardCoder;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.PropertyNames;
+import org.apache.beam.sdk.util.TimerInternals.TimerData;
+import org.apache.beam.sdk.util.TimerInternals.TimerDataCoder;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
+
+/**
+ * A {@link Coder} for {@link KeyedWorkItem KeyedWorkItems}.
+ */
+public class KeyedWorkItemCoder<K, ElemT> extends StandardCoder<KeyedWorkItem<K, ElemT>> {
+ /**
+ * Create a new {@link KeyedWorkItemCoder} with the provided key coder, element coder, and window
+ * coder.
+ */
+ public static <K, ElemT> KeyedWorkItemCoder<K, ElemT> of(
+ Coder<K> keyCoder, Coder<ElemT> elemCoder, Coder<? extends BoundedWindow> windowCoder) {
+ return new KeyedWorkItemCoder<>(keyCoder, elemCoder, windowCoder);
+ }
+
+ @JsonCreator
+ public static <K, ElemT> KeyedWorkItemCoder<K, ElemT> of(
+ @JsonProperty(PropertyNames.COMPONENT_ENCODINGS) List<Coder<?>> components) {
+ checkArgument(components.size() == 3, "Expecting 3 components, got %s", components.size());
+ @SuppressWarnings("unchecked")
+ Coder<K> keyCoder = (Coder<K>) components.get(0);
+ @SuppressWarnings("unchecked")
+ Coder<ElemT> elemCoder = (Coder<ElemT>) components.get(1);
+ @SuppressWarnings("unchecked")
+ Coder<? extends BoundedWindow> windowCoder = (Coder<? extends BoundedWindow>) components.get(2);
+ return new KeyedWorkItemCoder<>(keyCoder, elemCoder, windowCoder);
+ }
+
+ private final Coder<K> keyCoder;
+ private final Coder<ElemT> elemCoder;
+ private final Coder<? extends BoundedWindow> windowCoder;
+ private final Coder<Iterable<TimerData>> timersCoder;
+ private final Coder<Iterable<WindowedValue<ElemT>>> elemsCoder;
+
+ private KeyedWorkItemCoder(
+ Coder<K> keyCoder, Coder<ElemT> elemCoder, Coder<? extends BoundedWindow> windowCoder) {
+ this.keyCoder = keyCoder;
+ this.elemCoder = elemCoder;
+ this.windowCoder = windowCoder;
+ this.timersCoder = IterableCoder.of(TimerDataCoder.of(windowCoder));
+ this.elemsCoder = IterableCoder.of(FullWindowedValueCoder.of(elemCoder, windowCoder));
+ }
+
+ public Coder<K> getKeyCoder() {
+ return keyCoder;
+ }
+
+ public Coder<ElemT> getElementCoder() {
+ return elemCoder;
+ }
+
+ @Override
+ public void encode(KeyedWorkItem<K, ElemT> value, OutputStream outStream, Coder.Context context)
+ throws CoderException, IOException {
+ Coder.Context nestedContext = context.nested();
+ keyCoder.encode(value.key(), outStream, nestedContext);
+ timersCoder.encode(value.timersIterable(), outStream, nestedContext);
+ elemsCoder.encode(value.elementsIterable(), outStream, nestedContext);
+ }
+
+ @Override
+ public KeyedWorkItem<K, ElemT> decode(InputStream inStream, Coder.Context context)
+ throws CoderException, IOException {
+ Coder.Context nestedContext = context.nested();
+ K key = keyCoder.decode(inStream, nestedContext);
+ Iterable<TimerData> timers = timersCoder.decode(inStream, nestedContext);
+ Iterable<WindowedValue<ElemT>> elems = elemsCoder.decode(inStream, nestedContext);
+ return KeyedWorkItems.workItem(key, timers, elems);
+ }
+
+ @Override
+ public List<? extends Coder<?>> getCoderArguments() {
+ return ImmutableList.of(keyCoder, elemCoder, windowCoder);
+ }
+
+ @Override
+ public void verifyDeterministic() throws Coder.NonDeterministicException {
+ keyCoder.verifyDeterministic();
+ timersCoder.verifyDeterministic();
+ elemsCoder.verifyDeterministic();
+ }
+
+ /**
+ * {@inheritDoc}.
+ *
+ * {@link KeyedWorkItemCoder} is not consistent with equals as it can return a
+ * {@link KeyedWorkItem} of a type different from the originally encoded type.
+ */
+ @Override
+ public boolean consistentWithEquals() {
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8a2f020f/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItems.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItems.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItems.java
new file mode 100644
index 0000000..94c3bb6
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItems.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.Iterables;
+import java.util.Collections;
+import java.util.Objects;
+import org.apache.beam.sdk.util.TimerInternals.TimerData;
+import org.apache.beam.sdk.util.WindowedValue;
+
+/**
+ * Static utility methods that provide {@link KeyedWorkItem} implementations.
+ */
+public class KeyedWorkItems {
+ /**
+ * Returns an implementation of {@link KeyedWorkItem} that wraps around an elements iterable.
+ *
+ * @param <K> the key type
+ * @param <ElemT> the element type
+ */
+ public static <K, ElemT> KeyedWorkItem<K, ElemT> elementsWorkItem(
+ K key, Iterable<WindowedValue<ElemT>> elementsIterable) {
+ return new ComposedKeyedWorkItem<>(key, Collections.<TimerData>emptyList(), elementsIterable);
+ }
+
+ /**
+ * Returns an implementation of {@link KeyedWorkItem} that wraps around an timers iterable.
+ *
+ * @param <K> the key type
+ * @param <ElemT> the element type
+ */
+ public static <K, ElemT> KeyedWorkItem<K, ElemT> timersWorkItem(
+ K key, Iterable<TimerData> timersIterable) {
+ return new ComposedKeyedWorkItem<>(
+ key, timersIterable, Collections.<WindowedValue<ElemT>>emptyList());
+ }
+
+ /**
+ * Returns an implementation of {@link KeyedWorkItem} that wraps around
+ * an timers iterable and an elements iterable.
+ *
+ * @param <K> the key type
+ * @param <ElemT> the element type
+ */
+ public static <K, ElemT> KeyedWorkItem<K, ElemT> workItem(
+ K key, Iterable<TimerData> timersIterable, Iterable<WindowedValue<ElemT>> elementsIterable) {
+ return new ComposedKeyedWorkItem<>(key, timersIterable, elementsIterable);
+ }
+
+ /**
+ * A {@link KeyedWorkItem} composed of an underlying key, {@link TimerData} iterable, and element
+ * iterable.
+ */
+ public static class ComposedKeyedWorkItem<K, ElemT> implements KeyedWorkItem<K, ElemT> {
+ private final K key;
+ private final Iterable<TimerData> timers;
+ private final Iterable<WindowedValue<ElemT>> elements;
+
+ private ComposedKeyedWorkItem(
+ K key, Iterable<TimerData> timers, Iterable<WindowedValue<ElemT>> elements) {
+ this.key = key;
+ this.timers = timers;
+ this.elements = elements;
+ }
+
+ @Override
+ public K key() {
+ return key;
+ }
+
+ @Override
+ public Iterable<TimerData> timersIterable() {
+ return timers;
+ }
+
+ @Override
+ public Iterable<WindowedValue<ElemT>> elementsIterable() {
+ return elements;
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other == null || !(other instanceof ComposedKeyedWorkItem)) {
+ return false;
+ }
+ KeyedWorkItem<?, ?> that = (KeyedWorkItem<?, ?>) other;
+ return Objects.equals(this.key, that.key())
+ && Iterables.elementsEqual(this.timersIterable(), that.timersIterable())
+ && Iterables.elementsEqual(this.elementsIterable(), that.elementsIterable());
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(key, timers, elements);
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(ComposedKeyedWorkItem.class)
+ .add("key", key)
+ .add("elements", elements)
+ .add("timers", timers)
+ .toString();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8a2f020f/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
index 63a80d2..b6f700f 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
@@ -24,8 +24,6 @@ import com.google.common.collect.Iterables;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.KeyedWorkItem;
-import org.apache.beam.sdk.util.KeyedWorkItems;
import org.apache.beam.sdk.util.TimerInternals;
import org.apache.beam.sdk.util.WindowTracing;
import org.apache.beam.sdk.util.WindowedValue;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8a2f020f/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
index 80fd17b..a633111 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
@@ -49,8 +49,6 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.KeyedWorkItem;
-import org.apache.beam.sdk.util.KeyedWorkItemCoder;
import org.apache.beam.sdk.util.TimeDomain;
import org.apache.beam.sdk.util.Timer;
import org.apache.beam.sdk.util.TimerInternals;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8a2f020f/runners/core-java/src/test/java/org/apache/beam/runners/core/KeyedWorkItemCoderTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/KeyedWorkItemCoderTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/KeyedWorkItemCoderTest.java
new file mode 100644
index 0000000..37fabdd
--- /dev/null
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/KeyedWorkItemCoderTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.testing.CoderProperties;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.apache.beam.sdk.util.TimerInternals.TimerData;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.state.StateNamespaces;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link KeyedWorkItems}.
+ */
+@RunWith(JUnit4.class)
+public class KeyedWorkItemCoderTest {
+ @Test
+ public void testCoderProperties() throws Exception {
+ CoderProperties.coderSerializable(
+ KeyedWorkItemCoder.of(StringUtf8Coder.of(), VarIntCoder.of(), GlobalWindow.Coder.INSTANCE));
+ }
+
+ @Test
+ public void testEncodeDecodeEqual() throws Exception {
+ Iterable<TimerData> timers =
+ ImmutableList.<TimerData>of(
+ TimerData.of(StateNamespaces.global(), new Instant(500L), TimeDomain.EVENT_TIME));
+ Iterable<WindowedValue<Integer>> elements =
+ ImmutableList.of(
+ WindowedValue.valueInGlobalWindow(1),
+ WindowedValue.valueInGlobalWindow(4),
+ WindowedValue.valueInGlobalWindow(8));
+
+ KeyedWorkItemCoder<String, Integer> coder =
+ KeyedWorkItemCoder.of(StringUtf8Coder.of(), VarIntCoder.of(), GlobalWindow.Coder.INSTANCE);
+
+ CoderProperties.coderDecodeEncodeEqual(coder, KeyedWorkItems.workItem("foo", timers, elements));
+ CoderProperties.coderDecodeEncodeEqual(coder, KeyedWorkItems.elementsWorkItem("foo", elements));
+ CoderProperties.coderDecodeEncodeEqual(
+ coder, KeyedWorkItems.<String, Integer>timersWorkItem("foo", timers));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8a2f020f/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
index b13d839..cf96b66 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
@@ -46,8 +46,6 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.KeyedWorkItem;
-import org.apache.beam.sdk.util.KeyedWorkItems;
import org.apache.beam.sdk.util.TimerInternals;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.state.StateInternals;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8a2f020f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java
index 04becd7..1fa059c 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java
@@ -17,9 +17,9 @@
*/
package org.apache.beam.runners.direct;
+import org.apache.beam.runners.core.KeyedWorkItem;
import org.apache.beam.runners.core.SplittableParDo;
import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.util.KeyedWorkItem;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8a2f020f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
index efee801..21776e7 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
@@ -20,13 +20,13 @@ package org.apache.beam.runners.direct;
import static com.google.common.base.Preconditions.checkArgument;
+import org.apache.beam.runners.core.KeyedWorkItem;
+import org.apache.beam.runners.core.KeyedWorkItemCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.util.KeyedWorkItem;
-import org.apache.beam.sdk.util.KeyedWorkItemCoder;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8a2f020f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
index 929d09d..a308295 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
@@ -43,13 +43,13 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
+import org.apache.beam.runners.core.KeyedWorkItem;
+import org.apache.beam.runners.core.KeyedWorkItems;
import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
import org.apache.beam.runners.direct.WatermarkManager.FiredTimers;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.util.KeyedWorkItem;
-import org.apache.beam.sdk.util.KeyedWorkItems;
import org.apache.beam.sdk.util.TimerInternals.TimerData;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.util.WindowedValue;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8a2f020f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
index 9d25bc6..5c6b2c1 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
@@ -28,6 +28,7 @@ import org.apache.beam.runners.core.GroupAlsoByWindowsDoFn;
import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly;
import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupAlsoByWindow;
import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly;
+import org.apache.beam.runners.core.KeyedWorkItem;
import org.apache.beam.runners.core.OutputWindowedValue;
import org.apache.beam.runners.core.ReduceFnRunner;
import org.apache.beam.runners.core.SystemReduceFn;
@@ -44,7 +45,6 @@ import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.KeyedWorkItem;
import org.apache.beam.sdk.util.SideInputReader;
import org.apache.beam.sdk.util.TimerInternals;
import org.apache.beam.sdk.util.TimerInternals.TimerData;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8a2f020f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java
index 4d691ea..20d619f 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java
@@ -27,6 +27,8 @@ import java.util.List;
import java.util.Map;
import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly;
import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly;
+import org.apache.beam.runners.core.KeyedWorkItem;
+import org.apache.beam.runners.core.KeyedWorkItems;
import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupByKeyOnly;
import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
@@ -36,8 +38,6 @@ import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.util.KeyedWorkItem;
-import org.apache.beam.sdk.util.KeyedWorkItems;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8a2f020f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
index 0eca710..aae1149 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
@@ -20,13 +20,13 @@ package org.apache.beam.runners.direct;
import java.util.Collection;
import org.apache.beam.runners.core.DoFnRunners.OutputManager;
import org.apache.beam.runners.core.ElementAndRestriction;
+import org.apache.beam.runners.core.KeyedWorkItem;
import org.apache.beam.runners.core.OutputWindowedValue;
import org.apache.beam.runners.core.SplittableParDo;
import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.KeyedWorkItem;
import org.apache.beam.sdk.util.TimerInternals;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.state.StateInternals;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8a2f020f/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java
index a726817..7ba38ce 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java
@@ -25,6 +25,8 @@ import static org.mockito.Mockito.when;
import com.google.common.collect.HashMultiset;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Multiset;
+import org.apache.beam.runners.core.KeyedWorkItem;
+import org.apache.beam.runners.core.KeyedWorkItems;
import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupByKeyOnly;
import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
@@ -33,8 +35,6 @@ import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.util.KeyedWorkItem;
-import org.apache.beam.sdk.util.KeyedWorkItems;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8a2f020f/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactoryTest.java
index 3e5af14..23340c6 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactoryTest.java
@@ -25,6 +25,8 @@ import static org.mockito.Mockito.when;
import com.google.common.collect.HashMultiset;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Multiset;
+import org.apache.beam.runners.core.KeyedWorkItem;
+import org.apache.beam.runners.core.KeyedWorkItems;
import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupByKeyOnly;
import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
@@ -33,8 +35,6 @@ import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.util.KeyedWorkItem;
-import org.apache.beam.sdk.util.KeyedWorkItems;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8a2f020f/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItem.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItem.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItem.java
index 6d2582b..b53658e 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItem.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItem.java
@@ -18,7 +18,7 @@
package org.apache.beam.runners.flink.translation.wrappers.streaming;
import java.util.Collections;
-import org.apache.beam.sdk.util.KeyedWorkItem;
+import org.apache.beam.runners.core.KeyedWorkItem;
import org.apache.beam.sdk.util.TimerInternals;
import org.apache.beam.sdk.util.WindowedValue;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8a2f020f/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java
index 37454a3..ad30688 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java
@@ -26,12 +26,12 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.List;
+import org.apache.beam.runners.core.KeyedWorkItem;
+import org.apache.beam.runners.core.KeyedWorkItemCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.StandardCoder;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.KeyedWorkItem;
-import org.apache.beam.sdk.util.KeyedWorkItemCoder;
import org.apache.beam.sdk.util.PropertyNames;
import org.apache.beam.sdk.util.WindowedValue;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8a2f020f/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
index 432dc64..f2d7f1c 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
@@ -39,6 +39,8 @@ import java.util.Set;
import java.util.concurrent.ScheduledFuture;
import javax.annotation.Nullable;
import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetDoFn;
+import org.apache.beam.runners.core.KeyedWorkItem;
+import org.apache.beam.runners.core.KeyedWorkItems;
import org.apache.beam.runners.core.SystemReduceFn;
import org.apache.beam.runners.flink.translation.wrappers.DataInputViewWrapper;
import org.apache.beam.sdk.coders.Coder;
@@ -47,8 +49,6 @@ import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.ExecutionContext;
-import org.apache.beam.sdk.util.KeyedWorkItem;
-import org.apache.beam.sdk.util.KeyedWorkItems;
import org.apache.beam.sdk.util.TimeDomain;
import org.apache.beam.sdk.util.TimerInternals;
import org.apache.beam.sdk.util.WindowedValue;
@@ -59,8 +59,6 @@ import org.apache.beam.sdk.util.state.StateNamespace;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
-
-
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.memory.DataInputView;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8a2f020f/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WorkItemKeySelector.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WorkItemKeySelector.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WorkItemKeySelector.java
index 7829163..1dff367 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WorkItemKeySelector.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WorkItemKeySelector.java
@@ -18,9 +18,9 @@
package org.apache.beam.runners.flink.translation.wrappers.streaming;
import java.nio.ByteBuffer;
+import org.apache.beam.runners.core.KeyedWorkItem;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.util.CoderUtils;
-import org.apache.beam.sdk.util.KeyedWorkItem;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8a2f020f/sdks/java/core/src/main/java/org/apache/beam/sdk/util/KeyedWorkItem.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/KeyedWorkItem.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/KeyedWorkItem.java
deleted file mode 100644
index b273466..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/KeyedWorkItem.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import org.apache.beam.sdk.util.TimerInternals.TimerData;
-
-/**
- * Interface that contains all the timers and elements associated with a specific work item.
- *
- * @param <K> the key type
- * @param <ElemT> the element type
- */
-public interface KeyedWorkItem<K, ElemT> {
- /**
- * Returns the key.
- */
- K key();
-
- /**
- * Returns an iterable containing the timers.
- */
- Iterable<TimerData> timersIterable();
-
- /**
- * Returns an iterable containing the elements.
- */
- Iterable<WindowedValue<ElemT>> elementsIterable();
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8a2f020f/sdks/java/core/src/main/java/org/apache/beam/sdk/util/KeyedWorkItemCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/KeyedWorkItemCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/KeyedWorkItemCoder.java
deleted file mode 100644
index a6e3d6c..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/KeyedWorkItemCoder.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.collect.ImmutableList;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.List;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.IterableCoder;
-import org.apache.beam.sdk.coders.StandardCoder;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.TimerInternals.TimerData;
-import org.apache.beam.sdk.util.TimerInternals.TimerDataCoder;
-import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
-
-/**
- * A {@link Coder} for {@link KeyedWorkItem KeyedWorkItems}.
- */
-public class KeyedWorkItemCoder<K, ElemT> extends StandardCoder<KeyedWorkItem<K, ElemT>> {
- /**
- * Create a new {@link KeyedWorkItemCoder} with the provided key coder, element coder, and window
- * coder.
- */
- public static <K, ElemT> KeyedWorkItemCoder<K, ElemT> of(
- Coder<K> keyCoder, Coder<ElemT> elemCoder, Coder<? extends BoundedWindow> windowCoder) {
- return new KeyedWorkItemCoder<>(keyCoder, elemCoder, windowCoder);
- }
-
- @JsonCreator
- public static <K, ElemT> KeyedWorkItemCoder<K, ElemT> of(
- @JsonProperty(PropertyNames.COMPONENT_ENCODINGS) List<Coder<?>> components) {
- checkArgument(components.size() == 3, "Expecting 3 components, got %s", components.size());
- @SuppressWarnings("unchecked")
- Coder<K> keyCoder = (Coder<K>) components.get(0);
- @SuppressWarnings("unchecked")
- Coder<ElemT> elemCoder = (Coder<ElemT>) components.get(1);
- @SuppressWarnings("unchecked")
- Coder<? extends BoundedWindow> windowCoder = (Coder<? extends BoundedWindow>) components.get(2);
- return new KeyedWorkItemCoder<>(keyCoder, elemCoder, windowCoder);
- }
-
- private final Coder<K> keyCoder;
- private final Coder<ElemT> elemCoder;
- private final Coder<? extends BoundedWindow> windowCoder;
- private final Coder<Iterable<TimerData>> timersCoder;
- private final Coder<Iterable<WindowedValue<ElemT>>> elemsCoder;
-
- private KeyedWorkItemCoder(
- Coder<K> keyCoder, Coder<ElemT> elemCoder, Coder<? extends BoundedWindow> windowCoder) {
- this.keyCoder = keyCoder;
- this.elemCoder = elemCoder;
- this.windowCoder = windowCoder;
- this.timersCoder = IterableCoder.of(TimerDataCoder.of(windowCoder));
- this.elemsCoder = IterableCoder.of(FullWindowedValueCoder.of(elemCoder, windowCoder));
- }
-
- public Coder<K> getKeyCoder() {
- return keyCoder;
- }
-
- public Coder<ElemT> getElementCoder() {
- return elemCoder;
- }
-
- @Override
- public void encode(KeyedWorkItem<K, ElemT> value, OutputStream outStream, Coder.Context context)
- throws CoderException, IOException {
- Coder.Context nestedContext = context.nested();
- keyCoder.encode(value.key(), outStream, nestedContext);
- timersCoder.encode(value.timersIterable(), outStream, nestedContext);
- elemsCoder.encode(value.elementsIterable(), outStream, nestedContext);
- }
-
- @Override
- public KeyedWorkItem<K, ElemT> decode(InputStream inStream, Coder.Context context)
- throws CoderException, IOException {
- Coder.Context nestedContext = context.nested();
- K key = keyCoder.decode(inStream, nestedContext);
- Iterable<TimerData> timers = timersCoder.decode(inStream, nestedContext);
- Iterable<WindowedValue<ElemT>> elems = elemsCoder.decode(inStream, nestedContext);
- return KeyedWorkItems.workItem(key, timers, elems);
- }
-
- @Override
- public List<? extends Coder<?>> getCoderArguments() {
- return ImmutableList.of(keyCoder, elemCoder, windowCoder);
- }
-
- @Override
- public void verifyDeterministic() throws Coder.NonDeterministicException {
- keyCoder.verifyDeterministic();
- timersCoder.verifyDeterministic();
- elemsCoder.verifyDeterministic();
- }
-
- /**
- * {@inheritDoc}.
- *
- * {@link KeyedWorkItemCoder} is not consistent with equals as it can return a
- * {@link KeyedWorkItem} of a type different from the originally encoded type.
- */
- @Override
- public boolean consistentWithEquals() {
- return false;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8a2f020f/sdks/java/core/src/main/java/org/apache/beam/sdk/util/KeyedWorkItems.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/KeyedWorkItems.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/KeyedWorkItems.java
deleted file mode 100644
index 7434842..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/KeyedWorkItems.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import com.google.common.base.MoreObjects;
-import com.google.common.collect.Iterables;
-import java.util.Collections;
-import java.util.Objects;
-import org.apache.beam.sdk.util.TimerInternals.TimerData;
-
-/**
- * Static utility methods that provide {@link KeyedWorkItem} implementations.
- */
-public class KeyedWorkItems {
- /**
- * Returns an implementation of {@link KeyedWorkItem} that wraps around an elements iterable.
- *
- * @param <K> the key type
- * @param <ElemT> the element type
- */
- public static <K, ElemT> KeyedWorkItem<K, ElemT> elementsWorkItem(
- K key, Iterable<WindowedValue<ElemT>> elementsIterable) {
- return new ComposedKeyedWorkItem<>(key, Collections.<TimerData>emptyList(), elementsIterable);
- }
-
- /**
- * Returns an implementation of {@link KeyedWorkItem} that wraps around an timers iterable.
- *
- * @param <K> the key type
- * @param <ElemT> the element type
- */
- public static <K, ElemT> KeyedWorkItem<K, ElemT> timersWorkItem(
- K key, Iterable<TimerData> timersIterable) {
- return new ComposedKeyedWorkItem<>(
- key, timersIterable, Collections.<WindowedValue<ElemT>>emptyList());
- }
-
- /**
- * Returns an implementation of {@link KeyedWorkItem} that wraps around
- * an timers iterable and an elements iterable.
- *
- * @param <K> the key type
- * @param <ElemT> the element type
- */
- public static <K, ElemT> KeyedWorkItem<K, ElemT> workItem(
- K key, Iterable<TimerData> timersIterable, Iterable<WindowedValue<ElemT>> elementsIterable) {
- return new ComposedKeyedWorkItem<>(key, timersIterable, elementsIterable);
- }
-
- /**
- * A {@link KeyedWorkItem} composed of an underlying key, {@link TimerData} iterable, and element
- * iterable.
- */
- public static class ComposedKeyedWorkItem<K, ElemT> implements KeyedWorkItem<K, ElemT> {
- private final K key;
- private final Iterable<TimerData> timers;
- private final Iterable<WindowedValue<ElemT>> elements;
-
- private ComposedKeyedWorkItem(
- K key, Iterable<TimerData> timers, Iterable<WindowedValue<ElemT>> elements) {
- this.key = key;
- this.timers = timers;
- this.elements = elements;
- }
-
- @Override
- public K key() {
- return key;
- }
-
- @Override
- public Iterable<TimerData> timersIterable() {
- return timers;
- }
-
- @Override
- public Iterable<WindowedValue<ElemT>> elementsIterable() {
- return elements;
- }
-
- @Override
- public boolean equals(Object other) {
- if (other == null || !(other instanceof ComposedKeyedWorkItem)) {
- return false;
- }
- KeyedWorkItem<?, ?> that = (KeyedWorkItem<?, ?>) other;
- return Objects.equals(this.key, that.key())
- && Iterables.elementsEqual(this.timersIterable(), that.timersIterable())
- && Iterables.elementsEqual(this.elementsIterable(), that.elementsIterable());
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(key, timers, elements);
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(ComposedKeyedWorkItem.class)
- .add("key", key)
- .add("elements", elements)
- .add("timers", timers)
- .toString();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8a2f020f/sdks/java/core/src/test/java/org/apache/beam/sdk/util/KeyedWorkItemCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/KeyedWorkItemCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/KeyedWorkItemCoderTest.java
deleted file mode 100644
index 1974d9e..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/KeyedWorkItemCoderTest.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import com.google.common.collect.ImmutableList;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.VarIntCoder;
-import org.apache.beam.sdk.testing.CoderProperties;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.util.TimerInternals.TimerData;
-import org.apache.beam.sdk.util.state.StateNamespaces;
-import org.joda.time.Instant;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link KeyedWorkItems}.
- */
-@RunWith(JUnit4.class)
-public class KeyedWorkItemCoderTest {
- @Test
- public void testCoderProperties() throws Exception {
- CoderProperties.coderSerializable(
- KeyedWorkItemCoder.of(StringUtf8Coder.of(), VarIntCoder.of(), GlobalWindow.Coder.INSTANCE));
- }
-
- @Test
- public void testEncodeDecodeEqual() throws Exception {
- Iterable<TimerData> timers =
- ImmutableList.<TimerData>of(
- TimerData.of(StateNamespaces.global(), new Instant(500L), TimeDomain.EVENT_TIME));
- Iterable<WindowedValue<Integer>> elements =
- ImmutableList.of(
- WindowedValue.valueInGlobalWindow(1),
- WindowedValue.valueInGlobalWindow(4),
- WindowedValue.valueInGlobalWindow(8));
-
- KeyedWorkItemCoder<String, Integer> coder =
- KeyedWorkItemCoder.of(StringUtf8Coder.of(), VarIntCoder.of(), GlobalWindow.Coder.INSTANCE);
-
- CoderProperties.coderDecodeEncodeEqual(coder, KeyedWorkItems.workItem("foo", timers, elements));
- CoderProperties.coderDecodeEncodeEqual(coder, KeyedWorkItems.elementsWorkItem("foo", elements));
- CoderProperties.coderDecodeEncodeEqual(
- coder, KeyedWorkItems.<String, Integer>timersWorkItem("foo", timers));
- }
-}