You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/12/05 22:55:20 UTC

[2/3] incubator-beam git commit: Moved KeyedWorkItem and related classes to runners-core

Moved KeyedWorkItem and related classes to runners-core


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8a2f020f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8a2f020f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8a2f020f

Branch: refs/heads/master
Commit: 8a2f020f9781340e60609a5a8ec537871ae29570
Parents: 81d1295
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Nov 30 20:46:04 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Dec 5 14:55:02 2016 -0800

----------------------------------------------------------------------
 .../operators/ApexGroupByKeyOperator.java       |   4 +-
 .../apache/beam/runners/core/DoFnRunner.java    |   1 -
 .../apache/beam/runners/core/DoFnRunners.java   |   1 -
 .../core/GroupAlsoByWindowViaWindowSetDoFn.java |   1 -
 .../apache/beam/runners/core/KeyedWorkItem.java |  44 +++++++
 .../beam/runners/core/KeyedWorkItemCoder.java   | 130 +++++++++++++++++++
 .../beam/runners/core/KeyedWorkItems.java       | 122 +++++++++++++++++
 .../core/LateDataDroppingDoFnRunner.java        |   2 -
 .../beam/runners/core/SplittableParDo.java      |   2 -
 .../runners/core/KeyedWorkItemCoderTest.java    |  64 +++++++++
 .../beam/runners/core/SplittableParDoTest.java  |   2 -
 ...ectGBKIntoKeyedWorkItemsOverrideFactory.java |   2 +-
 .../beam/runners/direct/DirectGroupByKey.java   |   4 +-
 .../direct/ExecutorServiceParallelExecutor.java |   4 +-
 .../GroupAlsoByWindowEvaluatorFactory.java      |   2 +-
 .../direct/GroupByKeyOnlyEvaluatorFactory.java  |   4 +-
 ...littableProcessElementsEvaluatorFactory.java |   2 +-
 .../direct/GroupByKeyEvaluatorFactoryTest.java  |   4 +-
 .../GroupByKeyOnlyEvaluatorFactoryTest.java     |   4 +-
 .../streaming/SingletonKeyedWorkItem.java       |   2 +-
 .../streaming/SingletonKeyedWorkItemCoder.java  |   4 +-
 .../wrappers/streaming/WindowDoFnOperator.java  |   6 +-
 .../wrappers/streaming/WorkItemKeySelector.java |   2 +-
 .../org/apache/beam/sdk/util/KeyedWorkItem.java |  43 ------
 .../beam/sdk/util/KeyedWorkItemCoder.java       | 128 ------------------
 .../apache/beam/sdk/util/KeyedWorkItems.java    | 121 -----------------
 .../beam/sdk/util/KeyedWorkItemCoderTest.java   |  62 ---------
 27 files changed, 381 insertions(+), 386 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8a2f020f/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
index f49c785..48ac177 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
@@ -42,6 +42,8 @@ import org.apache.beam.runners.apex.ApexPipelineOptions;
 import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple;
 import org.apache.beam.runners.apex.translation.utils.SerializablePipelineOptions;
 import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetDoFn;
+import org.apache.beam.runners.core.KeyedWorkItem;
+import org.apache.beam.runners.core.KeyedWorkItems;
 import org.apache.beam.runners.core.SystemReduceFn;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
@@ -54,8 +56,6 @@ import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.CoderUtils;
-import org.apache.beam.sdk.util.KeyedWorkItem;
-import org.apache.beam.sdk.util.KeyedWorkItems;
 import org.apache.beam.sdk.util.TimeDomain;
 import org.apache.beam.sdk.util.TimerInternals;
 import org.apache.beam.sdk.util.WindowedValue;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8a2f020f/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java
index c84122b..aac8e8f 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java
@@ -20,7 +20,6 @@ package org.apache.beam.runners.core;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.OldDoFn.ProcessContext;
-import org.apache.beam.sdk.util.KeyedWorkItem;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8a2f020f/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
index 3840423..da16573 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
@@ -27,7 +27,6 @@ import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.ExecutionContext.StepContext;
-import org.apache.beam.sdk.util.KeyedWorkItem;
 import org.apache.beam.sdk.util.SideInputReader;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8a2f020f/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
index 8b10813..2082269 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
@@ -24,7 +24,6 @@ import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.KeyedWorkItem;
 import org.apache.beam.sdk.util.SystemDoFnInternal;
 import org.apache.beam.sdk.util.TimerInternals;
 import org.apache.beam.sdk.util.TimerInternals.TimerData;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8a2f020f/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItem.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItem.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItem.java
new file mode 100644
index 0000000..c75fc25
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItem.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import org.apache.beam.sdk.util.TimerInternals.TimerData;
+import org.apache.beam.sdk.util.WindowedValue;
+
+/**
+ * Interface that contains all the timers and elements associated with a specific work item.
+ *
+ * @param <K> the key type
+ * @param <ElemT> the element type
+ */
+public interface KeyedWorkItem<K, ElemT> {
+  /**
+   * Returns the key.
+   */
+  K key();
+
+  /**
+   * Returns an iterable containing the timers.
+   */
+  Iterable<TimerData> timersIterable();
+
+  /**
+   * Returns an iterable containing the elements.
+   */
+  Iterable<WindowedValue<ElemT>> elementsIterable();
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8a2f020f/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java
new file mode 100644
index 0000000..95be047
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.ImmutableList;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.List;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.coders.StandardCoder;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.PropertyNames;
+import org.apache.beam.sdk.util.TimerInternals.TimerData;
+import org.apache.beam.sdk.util.TimerInternals.TimerDataCoder;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
+
+/**
+ * A {@link Coder} for {@link KeyedWorkItem KeyedWorkItems}.
+ */
+public class KeyedWorkItemCoder<K, ElemT> extends StandardCoder<KeyedWorkItem<K, ElemT>> {
+  /**
+   * Create a new {@link KeyedWorkItemCoder} with the provided key coder, element coder, and window
+   * coder.
+   */
+  public static <K, ElemT> KeyedWorkItemCoder<K, ElemT> of(
+      Coder<K> keyCoder, Coder<ElemT> elemCoder, Coder<? extends BoundedWindow> windowCoder) {
+    return new KeyedWorkItemCoder<>(keyCoder, elemCoder, windowCoder);
+  }
+
+  @JsonCreator
+  public static <K, ElemT> KeyedWorkItemCoder<K, ElemT> of(
+      @JsonProperty(PropertyNames.COMPONENT_ENCODINGS) List<Coder<?>> components) {
+    checkArgument(components.size() == 3, "Expecting 3 components, got %s", components.size());
+    @SuppressWarnings("unchecked")
+    Coder<K> keyCoder = (Coder<K>) components.get(0);
+    @SuppressWarnings("unchecked")
+    Coder<ElemT> elemCoder = (Coder<ElemT>) components.get(1);
+    @SuppressWarnings("unchecked")
+    Coder<? extends BoundedWindow> windowCoder = (Coder<? extends BoundedWindow>) components.get(2);
+    return new KeyedWorkItemCoder<>(keyCoder, elemCoder, windowCoder);
+  }
+
+  private final Coder<K> keyCoder;
+  private final Coder<ElemT> elemCoder;
+  private final Coder<? extends BoundedWindow> windowCoder;
+  private final Coder<Iterable<TimerData>> timersCoder;
+  private final Coder<Iterable<WindowedValue<ElemT>>> elemsCoder;
+
+  private KeyedWorkItemCoder(
+      Coder<K> keyCoder, Coder<ElemT> elemCoder, Coder<? extends BoundedWindow> windowCoder) {
+    this.keyCoder = keyCoder;
+    this.elemCoder = elemCoder;
+    this.windowCoder = windowCoder;
+    this.timersCoder = IterableCoder.of(TimerDataCoder.of(windowCoder));
+    this.elemsCoder = IterableCoder.of(FullWindowedValueCoder.of(elemCoder, windowCoder));
+  }
+
+  public Coder<K> getKeyCoder() {
+    return keyCoder;
+  }
+
+  public Coder<ElemT> getElementCoder() {
+    return elemCoder;
+  }
+
+  @Override
+  public void encode(KeyedWorkItem<K, ElemT> value, OutputStream outStream, Coder.Context context)
+      throws CoderException, IOException {
+    Coder.Context nestedContext = context.nested();
+    keyCoder.encode(value.key(), outStream, nestedContext);
+    timersCoder.encode(value.timersIterable(), outStream, nestedContext);
+    elemsCoder.encode(value.elementsIterable(), outStream, nestedContext);
+  }
+
+  @Override
+  public KeyedWorkItem<K, ElemT> decode(InputStream inStream, Coder.Context context)
+      throws CoderException, IOException {
+    Coder.Context nestedContext = context.nested();
+    K key = keyCoder.decode(inStream, nestedContext);
+    Iterable<TimerData> timers = timersCoder.decode(inStream, nestedContext);
+    Iterable<WindowedValue<ElemT>> elems = elemsCoder.decode(inStream, nestedContext);
+    return KeyedWorkItems.workItem(key, timers, elems);
+  }
+
+  @Override
+  public List<? extends Coder<?>> getCoderArguments() {
+    return ImmutableList.of(keyCoder, elemCoder, windowCoder);
+  }
+
+  @Override
+  public void verifyDeterministic() throws Coder.NonDeterministicException {
+    keyCoder.verifyDeterministic();
+    timersCoder.verifyDeterministic();
+    elemsCoder.verifyDeterministic();
+  }
+
+  /**
+   * {@inheritDoc}.
+   *
+   * {@link KeyedWorkItemCoder} is not consistent with equals as it can return a
+   * {@link KeyedWorkItem} of a type different from the originally encoded type.
+   */
+  @Override
+  public boolean consistentWithEquals() {
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8a2f020f/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItems.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItems.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItems.java
new file mode 100644
index 0000000..94c3bb6
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItems.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.Iterables;
+import java.util.Collections;
+import java.util.Objects;
+import org.apache.beam.sdk.util.TimerInternals.TimerData;
+import org.apache.beam.sdk.util.WindowedValue;
+
+/**
+ * Static utility methods that provide {@link KeyedWorkItem} implementations.
+ */
+public class KeyedWorkItems {
+  /**
+   * Returns an implementation of {@link KeyedWorkItem} that wraps around an elements iterable.
+   *
+   * @param <K> the key type
+   * @param <ElemT> the element type
+   */
+  public static <K, ElemT> KeyedWorkItem<K, ElemT> elementsWorkItem(
+      K key, Iterable<WindowedValue<ElemT>> elementsIterable) {
+    return new ComposedKeyedWorkItem<>(key, Collections.<TimerData>emptyList(), elementsIterable);
+  }
+
+  /**
+   * Returns an implementation of {@link KeyedWorkItem} that wraps around an timers iterable.
+   *
+   * @param <K> the key type
+   * @param <ElemT> the element type
+   */
+  public static <K, ElemT> KeyedWorkItem<K, ElemT> timersWorkItem(
+      K key, Iterable<TimerData> timersIterable) {
+    return new ComposedKeyedWorkItem<>(
+        key, timersIterable, Collections.<WindowedValue<ElemT>>emptyList());
+  }
+
+  /**
+   * Returns an implementation of {@link KeyedWorkItem} that wraps around
+   * an timers iterable and an elements iterable.
+   *
+   * @param <K> the key type
+   * @param <ElemT> the element type
+   */
+  public static <K, ElemT> KeyedWorkItem<K, ElemT> workItem(
+      K key, Iterable<TimerData> timersIterable, Iterable<WindowedValue<ElemT>> elementsIterable) {
+    return new ComposedKeyedWorkItem<>(key, timersIterable, elementsIterable);
+  }
+
+  /**
+   * A {@link KeyedWorkItem} composed of an underlying key, {@link TimerData} iterable, and element
+   * iterable.
+   */
+  public static class ComposedKeyedWorkItem<K, ElemT> implements KeyedWorkItem<K, ElemT> {
+    private final K key;
+    private final Iterable<TimerData> timers;
+    private final Iterable<WindowedValue<ElemT>> elements;
+
+    private ComposedKeyedWorkItem(
+        K key, Iterable<TimerData> timers, Iterable<WindowedValue<ElemT>> elements) {
+      this.key = key;
+      this.timers = timers;
+      this.elements = elements;
+    }
+
+    @Override
+    public K key() {
+      return key;
+    }
+
+    @Override
+    public Iterable<TimerData> timersIterable() {
+      return timers;
+    }
+
+    @Override
+    public Iterable<WindowedValue<ElemT>> elementsIterable() {
+      return elements;
+    }
+
+    @Override
+    public boolean equals(Object other) {
+      if (other == null || !(other instanceof ComposedKeyedWorkItem)) {
+        return false;
+      }
+      KeyedWorkItem<?, ?> that = (KeyedWorkItem<?, ?>) other;
+      return Objects.equals(this.key, that.key())
+          && Iterables.elementsEqual(this.timersIterable(), that.timersIterable())
+          && Iterables.elementsEqual(this.elementsIterable(), that.elementsIterable());
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(key, timers, elements);
+    }
+
+    @Override
+    public String toString() {
+      return MoreObjects.toStringHelper(ComposedKeyedWorkItem.class)
+          .add("key", key)
+          .add("elements", elements)
+          .add("timers", timers)
+          .toString();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8a2f020f/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
index 63a80d2..b6f700f 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
@@ -24,8 +24,6 @@ import com.google.common.collect.Iterables;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.KeyedWorkItem;
-import org.apache.beam.sdk.util.KeyedWorkItems;
 import org.apache.beam.sdk.util.TimerInternals;
 import org.apache.beam.sdk.util.WindowTracing;
 import org.apache.beam.sdk.util.WindowedValue;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8a2f020f/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
index 80fd17b..a633111 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
@@ -49,8 +49,6 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
 import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.KeyedWorkItem;
-import org.apache.beam.sdk.util.KeyedWorkItemCoder;
 import org.apache.beam.sdk.util.TimeDomain;
 import org.apache.beam.sdk.util.Timer;
 import org.apache.beam.sdk.util.TimerInternals;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8a2f020f/runners/core-java/src/test/java/org/apache/beam/runners/core/KeyedWorkItemCoderTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/KeyedWorkItemCoderTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/KeyedWorkItemCoderTest.java
new file mode 100644
index 0000000..37fabdd
--- /dev/null
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/KeyedWorkItemCoderTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.testing.CoderProperties;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.apache.beam.sdk.util.TimerInternals.TimerData;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.state.StateNamespaces;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link KeyedWorkItems}.
+ */
+@RunWith(JUnit4.class)
+public class KeyedWorkItemCoderTest {
+  @Test
+  public void testCoderProperties() throws Exception {
+    CoderProperties.coderSerializable(
+        KeyedWorkItemCoder.of(StringUtf8Coder.of(), VarIntCoder.of(), GlobalWindow.Coder.INSTANCE));
+  }
+
+  @Test
+  public void testEncodeDecodeEqual() throws Exception {
+    Iterable<TimerData> timers =
+        ImmutableList.<TimerData>of(
+            TimerData.of(StateNamespaces.global(), new Instant(500L), TimeDomain.EVENT_TIME));
+    Iterable<WindowedValue<Integer>> elements =
+        ImmutableList.of(
+            WindowedValue.valueInGlobalWindow(1),
+            WindowedValue.valueInGlobalWindow(4),
+            WindowedValue.valueInGlobalWindow(8));
+
+    KeyedWorkItemCoder<String, Integer> coder =
+        KeyedWorkItemCoder.of(StringUtf8Coder.of(), VarIntCoder.of(), GlobalWindow.Coder.INSTANCE);
+
+    CoderProperties.coderDecodeEncodeEqual(coder, KeyedWorkItems.workItem("foo", timers, elements));
+    CoderProperties.coderDecodeEncodeEqual(coder, KeyedWorkItems.elementsWorkItem("foo", elements));
+    CoderProperties.coderDecodeEncodeEqual(
+        coder, KeyedWorkItems.<String, Integer>timersWorkItem("foo", timers));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8a2f020f/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
index b13d839..cf96b66 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
@@ -46,8 +46,6 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.KeyedWorkItem;
-import org.apache.beam.sdk.util.KeyedWorkItems;
 import org.apache.beam.sdk.util.TimerInternals;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.state.StateInternals;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8a2f020f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java
index 04becd7..1fa059c 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java
@@ -17,9 +17,9 @@
  */
 package org.apache.beam.runners.direct;
 
+import org.apache.beam.runners.core.KeyedWorkItem;
 import org.apache.beam.runners.core.SplittableParDo;
 import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.util.KeyedWorkItem;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8a2f020f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
index efee801..21776e7 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
@@ -20,13 +20,13 @@ package org.apache.beam.runners.direct;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
+import org.apache.beam.runners.core.KeyedWorkItem;
+import org.apache.beam.runners.core.KeyedWorkItemCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.util.KeyedWorkItem;
-import org.apache.beam.sdk.util.KeyedWorkItemCoder;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8a2f020f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
index 929d09d..a308295 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
@@ -43,13 +43,13 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import javax.annotation.Nullable;
+import org.apache.beam.runners.core.KeyedWorkItem;
+import org.apache.beam.runners.core.KeyedWorkItems;
 import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.runners.direct.WatermarkManager.FiredTimers;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.util.KeyedWorkItem;
-import org.apache.beam.sdk.util.KeyedWorkItems;
 import org.apache.beam.sdk.util.TimerInternals.TimerData;
 import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.util.WindowedValue;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8a2f020f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
index 9d25bc6..5c6b2c1 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
@@ -28,6 +28,7 @@ import org.apache.beam.runners.core.GroupAlsoByWindowsDoFn;
 import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly;
 import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupAlsoByWindow;
 import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly;
+import org.apache.beam.runners.core.KeyedWorkItem;
 import org.apache.beam.runners.core.OutputWindowedValue;
 import org.apache.beam.runners.core.ReduceFnRunner;
 import org.apache.beam.runners.core.SystemReduceFn;
@@ -44,7 +45,6 @@ import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.KeyedWorkItem;
 import org.apache.beam.sdk.util.SideInputReader;
 import org.apache.beam.sdk.util.TimerInternals;
 import org.apache.beam.sdk.util.TimerInternals.TimerData;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8a2f020f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java
index 4d691ea..20d619f 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java
@@ -27,6 +27,8 @@ import java.util.List;
 import java.util.Map;
 import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly;
 import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly;
+import org.apache.beam.runners.core.KeyedWorkItem;
+import org.apache.beam.runners.core.KeyedWorkItems;
 import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupByKeyOnly;
 import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
@@ -36,8 +38,6 @@ import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.util.KeyedWorkItem;
-import org.apache.beam.sdk.util.KeyedWorkItems;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8a2f020f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
index 0eca710..aae1149 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
@@ -20,13 +20,13 @@ package org.apache.beam.runners.direct;
 import java.util.Collection;
 import org.apache.beam.runners.core.DoFnRunners.OutputManager;
 import org.apache.beam.runners.core.ElementAndRestriction;
+import org.apache.beam.runners.core.KeyedWorkItem;
 import org.apache.beam.runners.core.OutputWindowedValue;
 import org.apache.beam.runners.core.SplittableParDo;
 import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.KeyedWorkItem;
 import org.apache.beam.sdk.util.TimerInternals;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.state.StateInternals;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8a2f020f/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java
index a726817..7ba38ce 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java
@@ -25,6 +25,8 @@ import static org.mockito.Mockito.when;
 import com.google.common.collect.HashMultiset;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Multiset;
+import org.apache.beam.runners.core.KeyedWorkItem;
+import org.apache.beam.runners.core.KeyedWorkItems;
 import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupByKeyOnly;
 import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
@@ -33,8 +35,6 @@ import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.util.KeyedWorkItem;
-import org.apache.beam.sdk.util.KeyedWorkItems;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8a2f020f/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactoryTest.java
index 3e5af14..23340c6 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactoryTest.java
@@ -25,6 +25,8 @@ import static org.mockito.Mockito.when;
 import com.google.common.collect.HashMultiset;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Multiset;
+import org.apache.beam.runners.core.KeyedWorkItem;
+import org.apache.beam.runners.core.KeyedWorkItems;
 import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupByKeyOnly;
 import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
@@ -33,8 +35,6 @@ import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.util.KeyedWorkItem;
-import org.apache.beam.sdk.util.KeyedWorkItems;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8a2f020f/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItem.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItem.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItem.java
index 6d2582b..b53658e 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItem.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItem.java
@@ -18,7 +18,7 @@
 package org.apache.beam.runners.flink.translation.wrappers.streaming;
 
 import java.util.Collections;
-import org.apache.beam.sdk.util.KeyedWorkItem;
+import org.apache.beam.runners.core.KeyedWorkItem;
 import org.apache.beam.sdk.util.TimerInternals;
 import org.apache.beam.sdk.util.WindowedValue;
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8a2f020f/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java
index 37454a3..ad30688 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java
@@ -26,12 +26,12 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.List;
+import org.apache.beam.runners.core.KeyedWorkItem;
+import org.apache.beam.runners.core.KeyedWorkItemCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.StandardCoder;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.KeyedWorkItem;
-import org.apache.beam.sdk.util.KeyedWorkItemCoder;
 import org.apache.beam.sdk.util.PropertyNames;
 import org.apache.beam.sdk.util.WindowedValue;
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8a2f020f/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
index 432dc64..f2d7f1c 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
@@ -39,6 +39,8 @@ import java.util.Set;
 import java.util.concurrent.ScheduledFuture;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetDoFn;
+import org.apache.beam.runners.core.KeyedWorkItem;
+import org.apache.beam.runners.core.KeyedWorkItems;
 import org.apache.beam.runners.core.SystemReduceFn;
 import org.apache.beam.runners.flink.translation.wrappers.DataInputViewWrapper;
 import org.apache.beam.sdk.coders.Coder;
@@ -47,8 +49,6 @@ import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.ExecutionContext;
-import org.apache.beam.sdk.util.KeyedWorkItem;
-import org.apache.beam.sdk.util.KeyedWorkItems;
 import org.apache.beam.sdk.util.TimeDomain;
 import org.apache.beam.sdk.util.TimerInternals;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -59,8 +59,6 @@ import org.apache.beam.sdk.util.state.StateNamespace;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
-
-
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.memory.DataInputView;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8a2f020f/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WorkItemKeySelector.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WorkItemKeySelector.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WorkItemKeySelector.java
index 7829163..1dff367 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WorkItemKeySelector.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WorkItemKeySelector.java
@@ -18,9 +18,9 @@
 package org.apache.beam.runners.flink.translation.wrappers.streaming;
 
 import java.nio.ByteBuffer;
+import org.apache.beam.runners.core.KeyedWorkItem;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.util.CoderUtils;
-import org.apache.beam.sdk.util.KeyedWorkItem;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8a2f020f/sdks/java/core/src/main/java/org/apache/beam/sdk/util/KeyedWorkItem.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/KeyedWorkItem.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/KeyedWorkItem.java
deleted file mode 100644
index b273466..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/KeyedWorkItem.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import org.apache.beam.sdk.util.TimerInternals.TimerData;
-
-/**
- * Interface that contains all the timers and elements associated with a specific work item.
- *
- * @param <K> the key type
- * @param <ElemT> the element type
- */
-public interface KeyedWorkItem<K, ElemT> {
-  /**
-   * Returns the key.
-   */
-  K key();
-
-  /**
-   * Returns an iterable containing the timers.
-   */
-  Iterable<TimerData> timersIterable();
-
-  /**
-   * Returns an iterable containing the elements.
-   */
-  Iterable<WindowedValue<ElemT>> elementsIterable();
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8a2f020f/sdks/java/core/src/main/java/org/apache/beam/sdk/util/KeyedWorkItemCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/KeyedWorkItemCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/KeyedWorkItemCoder.java
deleted file mode 100644
index a6e3d6c..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/KeyedWorkItemCoder.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.collect.ImmutableList;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.List;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.IterableCoder;
-import org.apache.beam.sdk.coders.StandardCoder;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.TimerInternals.TimerData;
-import org.apache.beam.sdk.util.TimerInternals.TimerDataCoder;
-import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
-
-/**
- * A {@link Coder} for {@link KeyedWorkItem KeyedWorkItems}.
- */
-public class KeyedWorkItemCoder<K, ElemT> extends StandardCoder<KeyedWorkItem<K, ElemT>> {
-  /**
-   * Create a new {@link KeyedWorkItemCoder} with the provided key coder, element coder, and window
-   * coder.
-   */
-  public static <K, ElemT> KeyedWorkItemCoder<K, ElemT> of(
-      Coder<K> keyCoder, Coder<ElemT> elemCoder, Coder<? extends BoundedWindow> windowCoder) {
-    return new KeyedWorkItemCoder<>(keyCoder, elemCoder, windowCoder);
-  }
-
-  @JsonCreator
-  public static <K, ElemT> KeyedWorkItemCoder<K, ElemT> of(
-      @JsonProperty(PropertyNames.COMPONENT_ENCODINGS) List<Coder<?>> components) {
-    checkArgument(components.size() == 3, "Expecting 3 components, got %s", components.size());
-    @SuppressWarnings("unchecked")
-    Coder<K> keyCoder = (Coder<K>) components.get(0);
-    @SuppressWarnings("unchecked")
-    Coder<ElemT> elemCoder = (Coder<ElemT>) components.get(1);
-    @SuppressWarnings("unchecked")
-    Coder<? extends BoundedWindow> windowCoder = (Coder<? extends BoundedWindow>) components.get(2);
-    return new KeyedWorkItemCoder<>(keyCoder, elemCoder, windowCoder);
-  }
-
-  private final Coder<K> keyCoder;
-  private final Coder<ElemT> elemCoder;
-  private final Coder<? extends BoundedWindow> windowCoder;
-  private final Coder<Iterable<TimerData>> timersCoder;
-  private final Coder<Iterable<WindowedValue<ElemT>>> elemsCoder;
-
-  private KeyedWorkItemCoder(
-      Coder<K> keyCoder, Coder<ElemT> elemCoder, Coder<? extends BoundedWindow> windowCoder) {
-    this.keyCoder = keyCoder;
-    this.elemCoder = elemCoder;
-    this.windowCoder = windowCoder;
-    this.timersCoder = IterableCoder.of(TimerDataCoder.of(windowCoder));
-    this.elemsCoder = IterableCoder.of(FullWindowedValueCoder.of(elemCoder, windowCoder));
-  }
-
-  public Coder<K> getKeyCoder() {
-    return keyCoder;
-  }
-
-  public Coder<ElemT> getElementCoder() {
-    return elemCoder;
-  }
-
-  @Override
-  public void encode(KeyedWorkItem<K, ElemT> value, OutputStream outStream, Coder.Context context)
-      throws CoderException, IOException {
-    Coder.Context nestedContext = context.nested();
-    keyCoder.encode(value.key(), outStream, nestedContext);
-    timersCoder.encode(value.timersIterable(), outStream, nestedContext);
-    elemsCoder.encode(value.elementsIterable(), outStream, nestedContext);
-  }
-
-  @Override
-  public KeyedWorkItem<K, ElemT> decode(InputStream inStream, Coder.Context context)
-      throws CoderException, IOException {
-    Coder.Context nestedContext = context.nested();
-    K key = keyCoder.decode(inStream, nestedContext);
-    Iterable<TimerData> timers = timersCoder.decode(inStream, nestedContext);
-    Iterable<WindowedValue<ElemT>> elems = elemsCoder.decode(inStream, nestedContext);
-    return KeyedWorkItems.workItem(key, timers, elems);
-  }
-
-  @Override
-  public List<? extends Coder<?>> getCoderArguments() {
-    return ImmutableList.of(keyCoder, elemCoder, windowCoder);
-  }
-
-  @Override
-  public void verifyDeterministic() throws Coder.NonDeterministicException {
-    keyCoder.verifyDeterministic();
-    timersCoder.verifyDeterministic();
-    elemsCoder.verifyDeterministic();
-  }
-
-  /**
-   * {@inheritDoc}.
-   *
-   * {@link KeyedWorkItemCoder} is not consistent with equals as it can return a
-   * {@link KeyedWorkItem} of a type different from the originally encoded type.
-   */
-  @Override
-  public boolean consistentWithEquals() {
-    return false;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8a2f020f/sdks/java/core/src/main/java/org/apache/beam/sdk/util/KeyedWorkItems.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/KeyedWorkItems.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/KeyedWorkItems.java
deleted file mode 100644
index 7434842..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/KeyedWorkItems.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import com.google.common.base.MoreObjects;
-import com.google.common.collect.Iterables;
-import java.util.Collections;
-import java.util.Objects;
-import org.apache.beam.sdk.util.TimerInternals.TimerData;
-
-/**
- * Static utility methods that provide {@link KeyedWorkItem} implementations.
- */
-public class KeyedWorkItems {
-  /**
-   * Returns an implementation of {@link KeyedWorkItem} that wraps around an elements iterable.
-   *
-   * @param <K> the key type
-   * @param <ElemT> the element type
-   */
-  public static <K, ElemT> KeyedWorkItem<K, ElemT> elementsWorkItem(
-      K key, Iterable<WindowedValue<ElemT>> elementsIterable) {
-    return new ComposedKeyedWorkItem<>(key, Collections.<TimerData>emptyList(), elementsIterable);
-  }
-
-  /**
-   * Returns an implementation of {@link KeyedWorkItem} that wraps around an timers iterable.
-   *
-   * @param <K> the key type
-   * @param <ElemT> the element type
-   */
-  public static <K, ElemT> KeyedWorkItem<K, ElemT> timersWorkItem(
-      K key, Iterable<TimerData> timersIterable) {
-    return new ComposedKeyedWorkItem<>(
-        key, timersIterable, Collections.<WindowedValue<ElemT>>emptyList());
-  }
-
-  /**
-   * Returns an implementation of {@link KeyedWorkItem} that wraps around
-   * an timers iterable and an elements iterable.
-   *
-   * @param <K> the key type
-   * @param <ElemT> the element type
-   */
-  public static <K, ElemT> KeyedWorkItem<K, ElemT> workItem(
-      K key, Iterable<TimerData> timersIterable, Iterable<WindowedValue<ElemT>> elementsIterable) {
-    return new ComposedKeyedWorkItem<>(key, timersIterable, elementsIterable);
-  }
-
-  /**
-   * A {@link KeyedWorkItem} composed of an underlying key, {@link TimerData} iterable, and element
-   * iterable.
-   */
-  public static class ComposedKeyedWorkItem<K, ElemT> implements KeyedWorkItem<K, ElemT> {
-    private final K key;
-    private final Iterable<TimerData> timers;
-    private final Iterable<WindowedValue<ElemT>> elements;
-
-    private ComposedKeyedWorkItem(
-        K key, Iterable<TimerData> timers, Iterable<WindowedValue<ElemT>> elements) {
-      this.key = key;
-      this.timers = timers;
-      this.elements = elements;
-    }
-
-    @Override
-    public K key() {
-      return key;
-    }
-
-    @Override
-    public Iterable<TimerData> timersIterable() {
-      return timers;
-    }
-
-    @Override
-    public Iterable<WindowedValue<ElemT>> elementsIterable() {
-      return elements;
-    }
-
-    @Override
-    public boolean equals(Object other) {
-      if (other == null || !(other instanceof ComposedKeyedWorkItem)) {
-        return false;
-      }
-      KeyedWorkItem<?, ?> that = (KeyedWorkItem<?, ?>) other;
-      return Objects.equals(this.key, that.key())
-          && Iterables.elementsEqual(this.timersIterable(), that.timersIterable())
-          && Iterables.elementsEqual(this.elementsIterable(), that.elementsIterable());
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hash(key, timers, elements);
-    }
-
-    @Override
-    public String toString() {
-      return MoreObjects.toStringHelper(ComposedKeyedWorkItem.class)
-          .add("key", key)
-          .add("elements", elements)
-          .add("timers", timers)
-          .toString();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8a2f020f/sdks/java/core/src/test/java/org/apache/beam/sdk/util/KeyedWorkItemCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/KeyedWorkItemCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/KeyedWorkItemCoderTest.java
deleted file mode 100644
index 1974d9e..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/KeyedWorkItemCoderTest.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import com.google.common.collect.ImmutableList;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.VarIntCoder;
-import org.apache.beam.sdk.testing.CoderProperties;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.util.TimerInternals.TimerData;
-import org.apache.beam.sdk.util.state.StateNamespaces;
-import org.joda.time.Instant;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link KeyedWorkItems}.
- */
-@RunWith(JUnit4.class)
-public class KeyedWorkItemCoderTest {
-  @Test
-  public void testCoderProperties() throws Exception {
-    CoderProperties.coderSerializable(
-        KeyedWorkItemCoder.of(StringUtf8Coder.of(), VarIntCoder.of(), GlobalWindow.Coder.INSTANCE));
-  }
-
-  @Test
-  public void testEncodeDecodeEqual() throws Exception {
-    Iterable<TimerData> timers =
-        ImmutableList.<TimerData>of(
-            TimerData.of(StateNamespaces.global(), new Instant(500L), TimeDomain.EVENT_TIME));
-    Iterable<WindowedValue<Integer>> elements =
-        ImmutableList.of(
-            WindowedValue.valueInGlobalWindow(1),
-            WindowedValue.valueInGlobalWindow(4),
-            WindowedValue.valueInGlobalWindow(8));
-
-    KeyedWorkItemCoder<String, Integer> coder =
-        KeyedWorkItemCoder.of(StringUtf8Coder.of(), VarIntCoder.of(), GlobalWindow.Coder.INSTANCE);
-
-    CoderProperties.coderDecodeEncodeEqual(coder, KeyedWorkItems.workItem("foo", timers, elements));
-    CoderProperties.coderDecodeEncodeEqual(coder, KeyedWorkItems.elementsWorkItem("foo", elements));
-    CoderProperties.coderDecodeEncodeEqual(
-        coder, KeyedWorkItems.<String, Integer>timersWorkItem("foo", timers));
-  }
-}