You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/06/23 03:05:12 UTC

[46/50] beam git commit: Uses KV in SplittableParDo expansion instead of ElementAndRestriction

Uses KV in SplittableParDo expansion instead of ElementAndRestriction

This is a workaround for the following issue.

ElementAndRestriction is in runners-core, which may be shaded by runners
(and is shaded by Dataflow runner), hence it should be *both* produced
and consumed by workers - but currently it's produced by (shaded)
SplittableParDo and consumed by (differently shaded) ProcessFn in the
runner's worker code.

There are several ways out of this, e.g. moving EAR into the SDK (icky
because it's an implementation detail of SplittableParDo), or using
a type that's already in the SDK. There may be other more complicated
ways too.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/799173fa
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/799173fa
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/799173fa

Branch: refs/heads/gearpump-runner
Commit: 799173fac4e07dab4547ba21971922336cd72c62
Parents: 6f12e7d
Author: Eugene Kirpichov <ki...@google.com>
Authored: Wed Jun 21 15:21:32 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Thu Jun 22 17:56:00 2017 -0700

----------------------------------------------------------------------
 .../construction/ElementAndRestriction.java     |  42 -------
 .../ElementAndRestrictionCoder.java             |  88 -------------
 .../core/construction/SplittableParDo.java      |  39 +++---
 .../ElementAndRestrictionCoderTest.java         | 126 -------------------
 .../beam/runners/core/ProcessFnRunner.java      |  16 +--
 .../core/SplittableParDoViaKeyedWorkItems.java  |  49 ++++----
 .../core/SplittableParDoProcessFnTest.java      |  16 +--
 ...littableProcessElementsEvaluatorFactory.java |  37 +++---
 .../FlinkStreamingTransformTranslators.java     |  19 +--
 .../streaming/SplittableDoFnOperator.java       |  16 +--
 10 files changed, 77 insertions(+), 371 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/799173fa/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ElementAndRestriction.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ElementAndRestriction.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ElementAndRestriction.java
deleted file mode 100644
index 53a86b1..0000000
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ElementAndRestriction.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core.construction;
-
-import com.google.auto.value.AutoValue;
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.transforms.DoFn;
-
-/**
- * A tuple of an element and a restriction applied to processing it with a
- * <a href="https://s.apache.org/splittable-do-fn">splittable</a> {@link DoFn}.
- */
-@Experimental(Experimental.Kind.SPLITTABLE_DO_FN)
-@AutoValue
-public abstract class ElementAndRestriction<ElementT, RestrictionT> {
-  /** The element to process. */
-  public abstract ElementT element();
-
-  /** The restriction applied to processing the element. */
-  public abstract RestrictionT restriction();
-
-  /** Constructs the {@link ElementAndRestriction}. */
-  public static <InputT, RestrictionT> ElementAndRestriction<InputT, RestrictionT> of(
-      InputT element, RestrictionT restriction) {
-    return new AutoValue_ElementAndRestriction<>(element, restriction);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/799173fa/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ElementAndRestrictionCoder.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ElementAndRestrictionCoder.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ElementAndRestrictionCoder.java
deleted file mode 100644
index 5ff0aae..0000000
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ElementAndRestrictionCoder.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core.construction;
-
-import com.google.common.collect.ImmutableList;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.List;
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.StructuredCoder;
-
-/** A {@link Coder} for {@link ElementAndRestriction}. */
-@Experimental(Experimental.Kind.SPLITTABLE_DO_FN)
-public class ElementAndRestrictionCoder<ElementT, RestrictionT>
-    extends StructuredCoder<ElementAndRestriction<ElementT, RestrictionT>> {
-  private final Coder<ElementT> elementCoder;
-  private final Coder<RestrictionT> restrictionCoder;
-
-  /**
-   * Creates an {@link ElementAndRestrictionCoder} from an element coder and a restriction coder.
-   */
-  public static <ElementT, RestrictionT> ElementAndRestrictionCoder<ElementT, RestrictionT> of(
-      Coder<ElementT> elementCoder, Coder<RestrictionT> restrictionCoder) {
-    return new ElementAndRestrictionCoder<>(elementCoder, restrictionCoder);
-  }
-
-  private ElementAndRestrictionCoder(
-      Coder<ElementT> elementCoder, Coder<RestrictionT> restrictionCoder) {
-    this.elementCoder = elementCoder;
-    this.restrictionCoder = restrictionCoder;
-  }
-
-  @Override
-  public void encode(
-      ElementAndRestriction<ElementT, RestrictionT> value, OutputStream outStream)
-      throws IOException {
-    if (value == null) {
-      throw new CoderException("cannot encode a null ElementAndRestriction");
-    }
-    elementCoder.encode(value.element(), outStream);
-    restrictionCoder.encode(value.restriction(), outStream);
-  }
-
-  @Override
-  public ElementAndRestriction<ElementT, RestrictionT> decode(InputStream inStream)
-      throws IOException {
-    ElementT key = elementCoder.decode(inStream);
-    RestrictionT value = restrictionCoder.decode(inStream);
-    return ElementAndRestriction.of(key, value);
-  }
-
-  @Override
-  public List<? extends Coder<?>> getCoderArguments() {
-    return ImmutableList.of(elementCoder, restrictionCoder);
-  }
-
-  @Override
-  public void verifyDeterministic() throws NonDeterministicException {
-    elementCoder.verifyDeterministic();
-    restrictionCoder.verifyDeterministic();
-  }
-
-  public Coder<ElementT> getElementCoder() {
-    return elementCoder;
-  }
-
-  public Coder<RestrictionT> getRestrictionCoder() {
-    return restrictionCoder;
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/799173fa/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
index 665e39d..5ccafcb 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
@@ -25,6 +25,7 @@ import java.util.UUID;
 import org.apache.beam.runners.core.construction.PTransformTranslation.RawPTransform;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -54,7 +55,7 @@ import org.apache.beam.sdk.values.WindowingStrategy;
  * <li>Explode windows, since splitting within each window has to happen independently
  * <li>Assign a unique key to each element/restriction pair
  * <li>Process the keyed element/restriction pairs in a runner-specific way with the splittable
- *   {@link DoFn}'s {@link DoFn.ProcessElement} method.
+ *     {@link DoFn}'s {@link DoFn.ProcessElement} method.
  * </ol>
  *
  * <p>This transform is intended as a helper for internal use by runners when implementing {@code
@@ -93,10 +94,9 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
     Coder<RestrictionT> restrictionCoder =
         DoFnInvokers.invokerFor(fn)
             .invokeGetRestrictionCoder(input.getPipeline().getCoderRegistry());
-    Coder<ElementAndRestriction<InputT, RestrictionT>> splitCoder =
-        ElementAndRestrictionCoder.of(input.getCoder(), restrictionCoder);
+    Coder<KV<InputT, RestrictionT>> splitCoder = KvCoder.of(input.getCoder(), restrictionCoder);
 
-    PCollection<KV<String, ElementAndRestriction<InputT, RestrictionT>>> keyedRestrictions =
+    PCollection<KV<String, KV<InputT, RestrictionT>>> keyedRestrictions =
         input
             .apply(
                 "Pair with initial restriction",
@@ -107,12 +107,10 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
             // ProcessFn requires all input elements to be in a single window and have a single
             // element per work item. This must precede the unique keying so each key has a single
             // associated element.
-            .apply(
-                "Explode windows",
-                ParDo.of(new ExplodeWindowsFn<ElementAndRestriction<InputT, RestrictionT>>()))
+            .apply("Explode windows", ParDo.of(new ExplodeWindowsFn<KV<InputT, RestrictionT>>()))
             .apply(
                 "Assign unique key",
-                WithKeys.of(new RandomUniqueKeyFn<ElementAndRestriction<InputT, RestrictionT>>()));
+                WithKeys.of(new RandomUniqueKeyFn<KV<InputT, RestrictionT>>()));
 
     return keyedRestrictions.apply(
         "ProcessKeyedElements",
@@ -140,12 +138,11 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
 
   /**
    * Runner-specific primitive {@link PTransform} that invokes the {@link DoFn.ProcessElement}
-   * method for a splittable {@link DoFn} on each {@link ElementAndRestriction} of the input {@link
-   * PCollection} of {@link KV KVs} keyed with arbitrary but globally unique keys.
+   * method for a splittable {@link DoFn} on each {@link KV} of the input {@link PCollection} of
+   * {@link KV KVs} keyed with arbitrary but globally unique keys.
    */
   public static class ProcessKeyedElements<InputT, OutputT, RestrictionT>
-      extends RawPTransform<
-          PCollection<KV<String, ElementAndRestriction<InputT, RestrictionT>>>, PCollectionTuple> {
+      extends RawPTransform<PCollection<KV<String, KV<InputT, RestrictionT>>>, PCollectionTuple> {
     private final DoFn<InputT, OutputT> fn;
     private final Coder<InputT> elementCoder;
     private final Coder<RestrictionT> restrictionCoder;
@@ -208,9 +205,7 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
     }
 
     @Override
-    public PCollectionTuple expand(
-        PCollection<KV<String, ElementAndRestriction<InputT, RestrictionT>>>
-            input) {
+    public PCollectionTuple expand(PCollection<KV<String, KV<InputT, RestrictionT>>> input) {
       return createPrimitiveOutputFor(
           input, fn, mainOutputTag, additionalOutputTags, windowingStrategy);
     }
@@ -257,7 +252,7 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
    * Pairs each input element with its initial restriction using the given splittable {@link DoFn}.
    */
   private static class PairWithRestrictionFn<InputT, OutputT, RestrictionT>
-      extends DoFn<InputT, ElementAndRestriction<InputT, RestrictionT>> {
+      extends DoFn<InputT, KV<InputT, RestrictionT>> {
     private DoFn<InputT, OutputT> fn;
     private transient DoFnInvoker<InputT, OutputT> invoker;
 
@@ -273,7 +268,7 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
     @ProcessElement
     public void processElement(ProcessContext context) {
       context.output(
-          ElementAndRestriction.of(
+          KV.of(
               context.element(),
               invoker.<RestrictionT>invokeGetInitialRestriction(context.element())));
     }
@@ -281,9 +276,7 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
 
   /** Splits the restriction using the given {@link SplitRestriction} method. */
   private static class SplitRestrictionFn<InputT, RestrictionT>
-      extends DoFn<
-          ElementAndRestriction<InputT, RestrictionT>,
-          ElementAndRestriction<InputT, RestrictionT>> {
+      extends DoFn<KV<InputT, RestrictionT>, KV<InputT, RestrictionT>> {
     private final DoFn<InputT, ?> splittableFn;
     private transient DoFnInvoker<InputT, ?> invoker;
 
@@ -298,14 +291,14 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
 
     @ProcessElement
     public void processElement(final ProcessContext c) {
-      final InputT element = c.element().element();
+      final InputT element = c.element().getKey();
       invoker.invokeSplitRestriction(
           element,
-          c.element().restriction(),
+          c.element().getValue(),
           new OutputReceiver<RestrictionT>() {
             @Override
             public void output(RestrictionT part) {
-              c.output(ElementAndRestriction.of(element, part));
+              c.output(KV.of(element, part));
             }
           });
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/799173fa/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ElementAndRestrictionCoderTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ElementAndRestrictionCoderTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ElementAndRestrictionCoderTest.java
deleted file mode 100644
index 051cbaa..0000000
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ElementAndRestrictionCoderTest.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core.construction;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import org.apache.beam.sdk.coders.BigEndianLongCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.ListCoder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.VarIntCoder;
-import org.apache.beam.sdk.coders.VarLongCoder;
-import org.apache.beam.sdk.testing.CoderProperties;
-import org.apache.beam.sdk.util.CoderUtils;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameter;
-
-/**
- * Tests for {@link ElementAndRestrictionCoder}.
- */
-@RunWith(Parameterized.class)
-public class ElementAndRestrictionCoderTest<K, V> {
-  private static class CoderAndData<T> {
-    Coder<T> coder;
-    List<T> data;
-  }
-
-  private static class AnyCoderAndData {
-    private CoderAndData<?> coderAndData;
-  }
-
-  private static <T> AnyCoderAndData coderAndData(Coder<T> coder, List<T> data) {
-    CoderAndData<T> coderAndData = new CoderAndData<>();
-    coderAndData.coder = coder;
-    coderAndData.data = data;
-    AnyCoderAndData res = new AnyCoderAndData();
-    res.coderAndData = coderAndData;
-    return res;
-  }
-
-  private static final List<AnyCoderAndData> TEST_DATA =
-      Arrays.asList(
-          coderAndData(
-              VarIntCoder.of(), Arrays.asList(-1, 0, 1, 13, Integer.MAX_VALUE, Integer.MIN_VALUE)),
-          coderAndData(
-              BigEndianLongCoder.of(),
-              Arrays.asList(-1L, 0L, 1L, 13L, Long.MAX_VALUE, Long.MIN_VALUE)),
-          coderAndData(StringUtf8Coder.of(), Arrays.asList("", "hello", "goodbye", "1")),
-          coderAndData(
-              ElementAndRestrictionCoder.of(StringUtf8Coder.of(), VarIntCoder.of()),
-              Arrays.asList(
-                  ElementAndRestriction.of("", -1),
-                  ElementAndRestriction.of("hello", 0),
-                  ElementAndRestriction.of("goodbye", Integer.MAX_VALUE))),
-          coderAndData(
-              ListCoder.of(VarLongCoder.of()),
-              Arrays.asList(Arrays.asList(1L, 2L, 3L), Collections.<Long>emptyList())));
-
-  @Parameterized.Parameters(name = "{index}: keyCoder={0} key={1} valueCoder={2} value={3}")
-  public static Collection<Object[]> data() {
-    List<Object[]> parameters = new ArrayList<>();
-    for (AnyCoderAndData keyCoderAndData : TEST_DATA) {
-      Coder keyCoder = keyCoderAndData.coderAndData.coder;
-      for (Object key : keyCoderAndData.coderAndData.data) {
-        for (AnyCoderAndData valueCoderAndData : TEST_DATA) {
-          Coder valueCoder = valueCoderAndData.coderAndData.coder;
-          for (Object value : valueCoderAndData.coderAndData.data) {
-            parameters.add(new Object[] {keyCoder, key, valueCoder, value});
-          }
-        }
-      }
-    }
-    return parameters;
-  }
-
-  @Parameter(0)
-  public Coder<K> keyCoder;
-  @Parameter(1)
-  public K key;
-  @Parameter(2)
-  public Coder<V> valueCoder;
-  @Parameter(3)
-  public V value;
-
-  @Test
-  @SuppressWarnings("rawtypes")
-  public void testDecodeEncodeEqual() throws Exception {
-    CoderProperties.coderDecodeEncodeEqual(
-        ElementAndRestrictionCoder.of(keyCoder, valueCoder),
-        ElementAndRestriction.of(key, value));
-  }
-
-  @Rule public ExpectedException thrown = ExpectedException.none();
-
-  @Test
-  public void encodeNullThrowsCoderException() throws Exception {
-    thrown.expect(CoderException.class);
-    thrown.expectMessage("cannot encode a null ElementAndRestriction");
-
-    CoderUtils.encodeToBase64(
-        ElementAndRestrictionCoder.of(StringUtf8Coder.of(), VarIntCoder.of()), null);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/799173fa/runners/core-java/src/main/java/org/apache/beam/runners/core/ProcessFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ProcessFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ProcessFnRunner.java
index 31e86bd..88275d6 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ProcessFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ProcessFnRunner.java
@@ -24,11 +24,11 @@ import java.util.Collection;
 import java.util.Collections;
 import org.apache.beam.runners.core.StateNamespaces.WindowNamespace;
 import org.apache.beam.runners.core.TimerInternals.TimerData;
-import org.apache.beam.runners.core.construction.ElementAndRestriction;
 import org.apache.beam.sdk.state.TimeDomain;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.joda.time.Instant;
 
@@ -38,16 +38,13 @@ import org.joda.time.Instant;
  */
 public class ProcessFnRunner<InputT, OutputT, RestrictionT>
     implements PushbackSideInputDoFnRunner<
-        KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT> {
-  private final DoFnRunner<
-          KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT>
-      underlying;
+        KeyedWorkItem<String, KV<InputT, RestrictionT>>, OutputT> {
+  private final DoFnRunner<KeyedWorkItem<String, KV<InputT, RestrictionT>>, OutputT> underlying;
   private final Collection<PCollectionView<?>> views;
   private final ReadyCheckingSideInputReader sideInputReader;
 
   ProcessFnRunner(
-      DoFnRunner<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT>
-          underlying,
+      DoFnRunner<KeyedWorkItem<String, KV<InputT, RestrictionT>>, OutputT> underlying,
       Collection<PCollectionView<?>> views,
       ReadyCheckingSideInputReader sideInputReader) {
     this.underlying = underlying;
@@ -61,10 +58,9 @@ public class ProcessFnRunner<InputT, OutputT, RestrictionT>
   }
 
   @Override
-  public Iterable<WindowedValue<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>>>
+  public Iterable<WindowedValue<KeyedWorkItem<String, KV<InputT, RestrictionT>>>>
       processElementInReadyWindows(
-          WindowedValue<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>>
-              windowedKWI) {
+          WindowedValue<KeyedWorkItem<String, KV<InputT, RestrictionT>>> windowedKWI) {
     checkTrivialOuterWindows(windowedKWI);
     BoundedWindow window = getUnderlyingWindow(windowedKWI.getValue());
     if (!isReady(window)) {

http://git-wip-us.apache.org/repos/asf/beam/blob/799173fa/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
index c4b086a..09f3b15 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
@@ -21,7 +21,6 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Iterables;
 import java.util.List;
 import java.util.Map;
-import org.apache.beam.runners.core.construction.ElementAndRestriction;
 import org.apache.beam.runners.core.construction.PTransformReplacements;
 import org.apache.beam.runners.core.construction.PTransformTranslation.RawPTransform;
 import org.apache.beam.runners.core.construction.ReplacementOutputs;
@@ -86,15 +85,15 @@ public class SplittableParDoViaKeyedWorkItems {
   /** Overrides a {@link ProcessKeyedElements} into {@link SplittableProcessViaKeyedWorkItems}. */
   public static class OverrideFactory<InputT, OutputT, RestrictionT>
       implements PTransformOverrideFactory<
-          PCollection<KV<String, ElementAndRestriction<InputT, RestrictionT>>>, PCollectionTuple,
-      ProcessKeyedElements<InputT, OutputT, RestrictionT>> {
+          PCollection<KV<String, KV<InputT, RestrictionT>>>, PCollectionTuple,
+          ProcessKeyedElements<InputT, OutputT, RestrictionT>> {
     @Override
     public PTransformReplacement<
-            PCollection<KV<String, ElementAndRestriction<InputT, RestrictionT>>>, PCollectionTuple>
+            PCollection<KV<String, KV<InputT, RestrictionT>>>, PCollectionTuple>
         getReplacementTransform(
             AppliedPTransform<
-                    PCollection<KV<String, ElementAndRestriction<InputT, RestrictionT>>>,
-                    PCollectionTuple, ProcessKeyedElements<InputT, OutputT, RestrictionT>>
+                    PCollection<KV<String, KV<InputT, RestrictionT>>>, PCollectionTuple,
+                    ProcessKeyedElements<InputT, OutputT, RestrictionT>>
                 transform) {
       return PTransformReplacement.of(
           PTransformReplacements.getSingletonMainInput(transform),
@@ -113,8 +112,7 @@ public class SplittableParDoViaKeyedWorkItems {
    * method for a splittable {@link DoFn}.
    */
   public static class SplittableProcessViaKeyedWorkItems<InputT, OutputT, RestrictionT>
-      extends PTransform<
-          PCollection<KV<String, ElementAndRestriction<InputT, RestrictionT>>>, PCollectionTuple> {
+      extends PTransform<PCollection<KV<String, KV<InputT, RestrictionT>>>, PCollectionTuple> {
     private final ProcessKeyedElements<InputT, OutputT, RestrictionT> original;
 
     public SplittableProcessViaKeyedWorkItems(
@@ -123,15 +121,13 @@ public class SplittableParDoViaKeyedWorkItems {
     }
 
     @Override
-    public PCollectionTuple expand(
-        PCollection<KV<String, ElementAndRestriction<InputT, RestrictionT>>> input) {
+    public PCollectionTuple expand(PCollection<KV<String, KV<InputT, RestrictionT>>> input) {
       return input
-          .apply(new GBKIntoKeyedWorkItems<String, ElementAndRestriction<InputT, RestrictionT>>())
+          .apply(new GBKIntoKeyedWorkItems<String, KV<InputT, RestrictionT>>())
           .setCoder(
               KeyedWorkItemCoder.of(
                   StringUtf8Coder.of(),
-                  ((KvCoder<String, ElementAndRestriction<InputT, RestrictionT>>) input.getCoder())
-                      .getValueCoder(),
+                  ((KvCoder<String, KV<InputT, RestrictionT>>) input.getCoder()).getValueCoder(),
                   input.getWindowingStrategy().getWindowFn().windowCoder()))
           .apply(new ProcessElements<>(original));
     }
@@ -141,8 +137,7 @@ public class SplittableParDoViaKeyedWorkItems {
   public static class ProcessElements<
           InputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker<RestrictionT>>
       extends PTransform<
-          PCollection<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>>,
-          PCollectionTuple> {
+          PCollection<KeyedWorkItem<String, KV<InputT, RestrictionT>>>, PCollectionTuple> {
     private final ProcessKeyedElements<InputT, OutputT, RestrictionT> original;
 
     public ProcessElements(ProcessKeyedElements<InputT, OutputT, RestrictionT> original) {
@@ -176,7 +171,7 @@ public class SplittableParDoViaKeyedWorkItems {
 
     @Override
     public PCollectionTuple expand(
-        PCollection<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>> input) {
+        PCollection<KeyedWorkItem<String, KV<InputT, RestrictionT>>> input) {
       return ProcessKeyedElements.createPrimitiveOutputFor(
           input,
           original.getFn(),
@@ -201,7 +196,7 @@ public class SplittableParDoViaKeyedWorkItems {
   @VisibleForTesting
   public static class ProcessFn<
           InputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker<RestrictionT>>
-      extends DoFn<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT> {
+      extends DoFn<KeyedWorkItem<String, KV<InputT, RestrictionT>>, OutputT> {
     /**
      * The state cell containing a watermark hold for the output of this {@link DoFn}. The hold is
      * acquired during the first {@link DoFn.ProcessElement} call for each element and restriction,
@@ -321,7 +316,7 @@ public class SplittableParDoViaKeyedWorkItems {
       boolean isSeedCall = (timer == null);
       StateNamespace stateNamespace;
       if (isSeedCall) {
-        WindowedValue<ElementAndRestriction<InputT, RestrictionT>> windowedValue =
+        WindowedValue<KV<InputT, RestrictionT>> windowedValue =
             Iterables.getOnlyElement(c.element().elementsIterable());
         BoundedWindow window = Iterables.getOnlyElement(windowedValue.getWindows());
         stateNamespace =
@@ -337,27 +332,25 @@ public class SplittableParDoViaKeyedWorkItems {
           stateInternals.state(stateNamespace, restrictionTag);
       WatermarkHoldState holdState = stateInternals.state(stateNamespace, watermarkHoldTag);
 
-      ElementAndRestriction<WindowedValue<InputT>, RestrictionT> elementAndRestriction;
+      KV<WindowedValue<InputT>, RestrictionT> elementAndRestriction;
       if (isSeedCall) {
-        WindowedValue<ElementAndRestriction<InputT, RestrictionT>> windowedValue =
+        WindowedValue<KV<InputT, RestrictionT>> windowedValue =
             Iterables.getOnlyElement(c.element().elementsIterable());
-        WindowedValue<InputT> element = windowedValue.withValue(windowedValue.getValue().element());
+        WindowedValue<InputT> element = windowedValue.withValue(windowedValue.getValue().getKey());
         elementState.write(element);
-        elementAndRestriction =
-            ElementAndRestriction.of(element, windowedValue.getValue().restriction());
+        elementAndRestriction = KV.of(element, windowedValue.getValue().getValue());
       } else {
         // This is not the first ProcessElement call for this element/restriction - rather,
         // this is a timer firing, so we need to fetch the element and restriction from state.
         elementState.readLater();
         restrictionState.readLater();
-        elementAndRestriction =
-            ElementAndRestriction.of(elementState.read(), restrictionState.read());
+        elementAndRestriction = KV.of(elementState.read(), restrictionState.read());
       }
 
-      final TrackerT tracker = invoker.invokeNewTracker(elementAndRestriction.restriction());
+      final TrackerT tracker = invoker.invokeNewTracker(elementAndRestriction.getValue());
       SplittableProcessElementInvoker<InputT, OutputT, RestrictionT, TrackerT>.Result result =
           processElementInvoker.invokeProcessElement(
-              invoker, elementAndRestriction.element(), tracker);
+              invoker, elementAndRestriction.getKey(), tracker);
 
       // Save state for resuming.
       if (result.getResidualRestriction() == null) {
@@ -370,7 +363,7 @@ public class SplittableParDoViaKeyedWorkItems {
       restrictionState.write(result.getResidualRestriction());
       Instant futureOutputWatermark = result.getFutureOutputWatermark();
       if (futureOutputWatermark == null) {
-        futureOutputWatermark = elementAndRestriction.element().getTimestamp();
+        futureOutputWatermark = elementAndRestriction.getKey().getTimestamp();
       }
       holdState.add(futureOutputWatermark);
       // Set a timer to continue processing this element.

http://git-wip-us.apache.org/repos/asf/beam/blob/799173fa/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java
index d242431..9543de8 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java
@@ -35,7 +35,6 @@ import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.concurrent.Executors;
 import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems.ProcessFn;
-import org.apache.beam.runners.core.construction.ElementAndRestriction;
 import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.InstantCoder;
@@ -53,6 +52,7 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TimestampedValue;
 import org.apache.beam.sdk.values.TupleTag;
@@ -111,9 +111,7 @@ public class SplittableParDoProcessFnTest {
   private static class ProcessFnTester<
           InputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker<RestrictionT>>
       implements AutoCloseable {
-    private final DoFnTester<
-            KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT>
-        tester;
+    private final DoFnTester<KeyedWorkItem<String, KV<InputT, RestrictionT>>, OutputT> tester;
     private Instant currentProcessingTime;
 
     private InMemoryTimerInternals timerInternals;
@@ -194,14 +192,13 @@ public class SplittableParDoProcessFnTest {
     void startElement(InputT element, RestrictionT restriction) throws Exception {
       startElement(
           WindowedValue.of(
-              ElementAndRestriction.of(element, restriction),
+              KV.of(element, restriction),
               currentProcessingTime,
               GlobalWindow.INSTANCE,
               PaneInfo.ON_TIME_AND_ONLY_FIRING));
     }
 
-    void startElement(WindowedValue<ElementAndRestriction<InputT, RestrictionT>> windowedValue)
-        throws Exception {
+    void startElement(WindowedValue<KV<InputT, RestrictionT>> windowedValue) throws Exception {
       tester.processElement(
           KeyedWorkItems.elementsWorkItem("key", Collections.singletonList(windowedValue)));
     }
@@ -223,8 +220,7 @@ public class SplittableParDoProcessFnTest {
         return false;
       }
       tester.processElement(
-          KeyedWorkItems.<String, ElementAndRestriction<InputT, RestrictionT>>timersWorkItem(
-              "key", timers));
+          KeyedWorkItems.<String, KV<InputT, RestrictionT>>timersWorkItem("key", timers));
       return true;
     }
 
@@ -309,7 +305,7 @@ public class SplittableParDoProcessFnTest {
             MAX_BUNDLE_DURATION);
     tester.startElement(
         WindowedValue.of(
-            ElementAndRestriction.of(42, new SomeRestriction()),
+            KV.of(42, new SomeRestriction()),
             base,
             Collections.singletonList(w),
             PaneInfo.ON_TIME_AND_ONLY_FIRING));

http://git-wip-us.apache.org/repos/asf/beam/blob/799173fa/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
index eccc83a..e6b51b7 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
@@ -35,7 +35,6 @@ import org.apache.beam.runners.core.StateInternals;
 import org.apache.beam.runners.core.StateInternalsFactory;
 import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.runners.core.TimerInternalsFactory;
-import org.apache.beam.runners.core.construction.ElementAndRestriction;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -43,6 +42,7 @@ import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.PCollectionView;
@@ -54,8 +54,7 @@ import org.joda.time.Instant;
 class SplittableProcessElementsEvaluatorFactory<
         InputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker<RestrictionT>>
     implements TransformEvaluatorFactory {
-  private final ParDoEvaluatorFactory<
-          KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT>
+  private final ParDoEvaluatorFactory<KeyedWorkItem<String, KV<InputT, RestrictionT>>, OutputT>
       delegateFactory;
   private final EvaluationContext evaluationContext;
 
@@ -84,14 +83,13 @@ class SplittableProcessElementsEvaluatorFactory<
   }
 
   @SuppressWarnings({"unchecked", "rawtypes"})
-  private TransformEvaluator<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>>
-      createEvaluator(
-          AppliedPTransform<
-                  PCollection<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>>,
-                  PCollectionTuple, ProcessElements<InputT, OutputT, RestrictionT, TrackerT>>
-              application,
-          CommittedBundle<InputT> inputBundle)
-          throws Exception {
+  private TransformEvaluator<KeyedWorkItem<String, KV<InputT, RestrictionT>>> createEvaluator(
+      AppliedPTransform<
+              PCollection<KeyedWorkItem<String, KV<InputT, RestrictionT>>>, PCollectionTuple,
+              ProcessElements<InputT, OutputT, RestrictionT, TrackerT>>
+          application,
+      CommittedBundle<InputT> inputBundle)
+      throws Exception {
     final ProcessElements<InputT, OutputT, RestrictionT, TrackerT> transform =
         application.getTransform();
 
@@ -101,9 +99,7 @@ class SplittableProcessElementsEvaluatorFactory<
     DoFnLifecycleManager fnManager = DoFnLifecycleManager.of(processFn);
     processFn =
         ((ProcessFn<InputT, OutputT, RestrictionT, TrackerT>)
-            fnManager
-                .<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT>
-                    get());
+            fnManager.<KeyedWorkItem<String, KV<InputT, RestrictionT>>, OutputT>get());
 
     String stepName = evaluationContext.getStepName(application);
     final DirectExecutionContext.DirectStepContext stepContext =
@@ -111,12 +107,12 @@ class SplittableProcessElementsEvaluatorFactory<
             .getExecutionContext(application, inputBundle.getKey())
             .getStepContext(stepName);
 
-    final ParDoEvaluator<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>>
+    final ParDoEvaluator<KeyedWorkItem<String, KV<InputT, RestrictionT>>>
         parDoEvaluator =
             delegateFactory.createParDoEvaluator(
                 application,
                 inputBundle.getKey(),
-                (PCollection<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>>)
+                (PCollection<KeyedWorkItem<String, KV<InputT, RestrictionT>>>)
                     inputBundle.getPCollection(),
                 transform.getSideInputs(),
                 transform.getMainOutputTag(),
@@ -189,17 +185,16 @@ class SplittableProcessElementsEvaluatorFactory<
   }
 
   private static <InputT, OutputT, RestrictionT>
-  ParDoEvaluator.DoFnRunnerFactory<
-                KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT>
+      ParDoEvaluator.DoFnRunnerFactory<KeyedWorkItem<String, KV<InputT, RestrictionT>>, OutputT>
           processFnRunnerFactory() {
     return new ParDoEvaluator.DoFnRunnerFactory<
-            KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT>() {
+        KeyedWorkItem<String, KV<InputT, RestrictionT>>, OutputT>() {
       @Override
       public PushbackSideInputDoFnRunner<
-          KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT>
+          KeyedWorkItem<String, KV<InputT, RestrictionT>>, OutputT>
       createRunner(
           PipelineOptions options,
-          DoFn<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT> fn,
+          DoFn<KeyedWorkItem<String, KV<InputT, RestrictionT>>, OutputT> fn,
           List<PCollectionView<?>> sideInputs,
           ReadyCheckingSideInputReader sideInputReader,
           OutputManager outputManager,

http://git-wip-us.apache.org/repos/asf/beam/blob/799173fa/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
index fef32de..3d7e81f 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
@@ -29,7 +29,6 @@ import java.util.Map;
 import org.apache.beam.runners.core.KeyedWorkItem;
 import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems;
 import org.apache.beam.runners.core.SystemReduceFn;
-import org.apache.beam.runners.core.construction.ElementAndRestriction;
 import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows;
 import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
 import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator;
@@ -548,14 +547,11 @@ class FlinkStreamingTransformTranslators {
           transform.getAdditionalOutputTags().getAll(),
           context,
           new ParDoTranslationHelper.DoFnOperatorFactory<
-              KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT>() {
+              KeyedWorkItem<String, KV<InputT, RestrictionT>>, OutputT>() {
             @Override
-            public DoFnOperator<
-                KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>,
-                OutputT> createDoFnOperator(
-                    DoFn<
-                        KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>,
-                        OutputT> doFn,
+            public DoFnOperator<KeyedWorkItem<String, KV<InputT, RestrictionT>>, OutputT>
+                createDoFnOperator(
+                    DoFn<KeyedWorkItem<String, KV<InputT, RestrictionT>>, OutputT> doFn,
                     String stepName,
                     List<PCollectionView<?>> sideInputs,
                     TupleTag<OutputT> mainOutputTag,
@@ -563,11 +559,8 @@ class FlinkStreamingTransformTranslators {
                     FlinkStreamingTranslationContext context,
                     WindowingStrategy<?, ?> windowingStrategy,
                     Map<TupleTag<?>, OutputTag<WindowedValue<?>>> tagsToOutputTags,
-                    Coder<
-                        WindowedValue<
-                            KeyedWorkItem<
-                                String,
-                                ElementAndRestriction<InputT, RestrictionT>>>> inputCoder,
+                    Coder<WindowedValue<KeyedWorkItem<String, KV<InputT, RestrictionT>>>>
+                        inputCoder,
                     Coder keyCoder,
                     Map<Integer, PCollectionView<?>> transformedSideInputs) {
               return new SplittableDoFnOperator<>(

http://git-wip-us.apache.org/repos/asf/beam/blob/799173fa/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
index 5d08eba..2f095d4 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
@@ -35,7 +35,6 @@ import org.apache.beam.runners.core.StateInternals;
 import org.apache.beam.runners.core.StateInternalsFactory;
 import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.runners.core.TimerInternalsFactory;
-import org.apache.beam.runners.core.construction.ElementAndRestriction;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -43,6 +42,7 @@ import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.WindowingStrategy;
@@ -55,18 +55,15 @@ import org.joda.time.Instant;
  * the {@code @ProcessElement} method of a splittable {@link DoFn}.
  */
 public class SplittableDoFnOperator<
-    InputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker<RestrictionT>>
-    extends DoFnOperator<
-    KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT> {
+        InputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker<RestrictionT>>
+    extends DoFnOperator<KeyedWorkItem<String, KV<InputT, RestrictionT>>, OutputT> {
 
   private transient ScheduledExecutorService executorService;
 
   public SplittableDoFnOperator(
-      DoFn<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT> doFn,
+      DoFn<KeyedWorkItem<String, KV<InputT, RestrictionT>>, OutputT> doFn,
       String stepName,
-      Coder<
-          WindowedValue<
-              KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>>> inputCoder,
+      Coder<WindowedValue<KeyedWorkItem<String, KV<InputT, RestrictionT>>>> inputCoder,
       TupleTag<OutputT> mainOutputTag,
       List<TupleTag<?>> additionalOutputTags,
       OutputManagerFactory<OutputT> outputManagerFactory,
@@ -87,7 +84,6 @@ public class SplittableDoFnOperator<
         sideInputs,
         options,
         keyCoder);
-
   }
 
   @Override
@@ -151,7 +147,7 @@ public class SplittableDoFnOperator<
   @Override
   public void fireTimer(InternalTimer<?, TimerInternals.TimerData> timer) {
     doFnRunner.processElement(WindowedValue.valueInGlobalWindow(
-        KeyedWorkItems.<String, ElementAndRestriction<InputT, RestrictionT>>timersWorkItem(
+        KeyedWorkItems.<String, KV<InputT, RestrictionT>>timersWorkItem(
             (String) stateInternals.getKey(),
             Collections.singletonList(timer.getNamespace()))));
   }