You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/03/24 03:47:56 UTC

[32/67] [partial] incubator-beam git commit: Directory reorganization

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/PubsubIOTranslator.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/PubsubIOTranslator.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/PubsubIOTranslator.java
deleted file mode 100644
index 8b066ab..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/PubsubIOTranslator.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.runners.dataflow;
-
-import com.google.cloud.dataflow.sdk.io.PubsubIO;
-import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
-import com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator.TransformTranslator;
-import com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator.TranslationContext;
-import com.google.cloud.dataflow.sdk.util.PropertyNames;
-import com.google.cloud.dataflow.sdk.util.WindowedValue;
-
-/**
- * Pubsub transform support code for the Dataflow backend.
- */
-public class PubsubIOTranslator {
-
-  /**
-   * Implements PubsubIO Read translation for the Dataflow backend.
-   */
-  public static class ReadTranslator<T> implements TransformTranslator<PubsubIO.Read.Bound<T>> {
-    @Override
-    @SuppressWarnings({"rawtypes", "unchecked"})
-    public void translate(
-        PubsubIO.Read.Bound transform,
-        TranslationContext context) {
-      translateReadHelper(transform, context);
-    }
-
-    private <T> void translateReadHelper(
-        PubsubIO.Read.Bound<T> transform,
-        TranslationContext context) {
-      if (!context.getPipelineOptions().isStreaming()) {
-        throw new IllegalArgumentException(
-            "PubsubIO.Read can only be used with the Dataflow streaming runner.");
-      }
-
-      context.addStep(transform, "ParallelRead");
-      context.addInput(PropertyNames.FORMAT, "pubsub");
-      if (transform.getTopic() != null) {
-        context.addInput(PropertyNames.PUBSUB_TOPIC, transform.getTopic().asV1Beta1Path());
-      }
-      if (transform.getSubscription() != null) {
-        context.addInput(
-            PropertyNames.PUBSUB_SUBSCRIPTION, transform.getSubscription().asV1Beta1Path());
-      }
-      if (transform.getTimestampLabel() != null) {
-        context.addInput(PropertyNames.PUBSUB_TIMESTAMP_LABEL, transform.getTimestampLabel());
-      }
-      if (transform.getIdLabel() != null) {
-        context.addInput(PropertyNames.PUBSUB_ID_LABEL, transform.getIdLabel());
-      }
-      context.addValueOnlyOutput(PropertyNames.OUTPUT, context.getOutput(transform));
-    }
-  }
-
-  /**
-   * Implements PubsubIO Write translation for the Dataflow backend.
-   */
-  public static class WriteTranslator<T>
-      implements TransformTranslator<DataflowPipelineRunner.StreamingPubsubIOWrite<T>> {
-
-    @Override
-    @SuppressWarnings({"rawtypes", "unchecked"})
-    public void translate(
-        DataflowPipelineRunner.StreamingPubsubIOWrite transform,
-        TranslationContext context) {
-      translateWriteHelper(transform, context);
-    }
-
-    private <T> void translateWriteHelper(
-        DataflowPipelineRunner.StreamingPubsubIOWrite<T> customTransform,
-        TranslationContext context) {
-      if (!context.getPipelineOptions().isStreaming()) {
-        throw new IllegalArgumentException(
-            "PubsubIO.Write is non-primitive for the Dataflow batch runner.");
-      }
-
-      PubsubIO.Write.Bound<T> transform = customTransform.getOverriddenTransform();
-
-      context.addStep(customTransform, "ParallelWrite");
-      context.addInput(PropertyNames.FORMAT, "pubsub");
-      context.addInput(PropertyNames.PUBSUB_TOPIC, transform.getTopic().asV1Beta1Path());
-      if (transform.getTimestampLabel() != null) {
-        context.addInput(PropertyNames.PUBSUB_TIMESTAMP_LABEL, transform.getTimestampLabel());
-      }
-      if (transform.getIdLabel() != null) {
-        context.addInput(PropertyNames.PUBSUB_ID_LABEL, transform.getIdLabel());
-      }
-      context.addEncodingInput(WindowedValue.getValueOnlyCoder(transform.getCoder()));
-      context.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(customTransform));
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/ReadTranslator.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/ReadTranslator.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/ReadTranslator.java
deleted file mode 100644
index f110e84..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/ReadTranslator.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.runners.dataflow;
-
-import static com.google.cloud.dataflow.sdk.util.Structs.addBoolean;
-import static com.google.cloud.dataflow.sdk.util.Structs.addDictionary;
-import static com.google.cloud.dataflow.sdk.util.Structs.addLong;
-
-import com.google.api.services.dataflow.model.SourceMetadata;
-import com.google.cloud.dataflow.sdk.io.FileBasedSource;
-import com.google.cloud.dataflow.sdk.io.Read;
-import com.google.cloud.dataflow.sdk.io.Source;
-import com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator;
-import com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator.TransformTranslator;
-import com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator.TranslationContext;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.util.PropertyNames;
-import com.google.cloud.dataflow.sdk.values.PValue;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Translator for the {@code Read} {@code PTransform} for the Dataflow back-end.
- */
-public class ReadTranslator implements TransformTranslator<Read.Bounded<?>> {
-  @Override
-  public void translate(Read.Bounded<?> transform, TranslationContext context) {
-    translateReadHelper(transform.getSource(), transform, context);
-  }
-
-  public static <T> void translateReadHelper(Source<T> source,
-      PTransform<?, ? extends PValue> transform,
-      DataflowPipelineTranslator.TranslationContext context) {
-    try {
-      // TODO: Move this validation out of translation once IOChannelUtils is portable
-      // and can be reconstructed on the worker.
-      if (source instanceof FileBasedSource) {
-        String filePatternOrSpec = ((FileBasedSource<?>) source).getFileOrPatternSpec();
-        context.getPipelineOptions()
-               .getPathValidator()
-               .validateInputFilePatternSupported(filePatternOrSpec);
-      }
-
-      context.addStep(transform, "ParallelRead");
-      context.addInput(PropertyNames.FORMAT, PropertyNames.CUSTOM_SOURCE_FORMAT);
-      context.addInput(
-          PropertyNames.SOURCE_STEP_INPUT,
-          cloudSourceToDictionary(
-              CustomSources.serializeToCloudSource(source, context.getPipelineOptions())));
-      context.addValueOnlyOutput(PropertyNames.OUTPUT, context.getOutput(transform));
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  // Represents a cloud Source as a dictionary for encoding inside the {@code SOURCE_STEP_INPUT}
-  // property of CloudWorkflowStep.input.
-  private static Map<String, Object> cloudSourceToDictionary(
-      com.google.api.services.dataflow.model.Source source) {
-    // Do not translate encoding - the source's encoding is translated elsewhere
-    // to the step's output info.
-    Map<String, Object> res = new HashMap<>();
-    addDictionary(res, PropertyNames.SOURCE_SPEC, source.getSpec());
-    if (source.getMetadata() != null) {
-      addDictionary(res, PropertyNames.SOURCE_METADATA,
-          cloudSourceMetadataToDictionary(source.getMetadata()));
-    }
-    if (source.getDoesNotNeedSplitting() != null) {
-      addBoolean(
-          res, PropertyNames.SOURCE_DOES_NOT_NEED_SPLITTING, source.getDoesNotNeedSplitting());
-    }
-    return res;
-  }
-
-  private static Map<String, Object> cloudSourceMetadataToDictionary(SourceMetadata metadata) {
-    Map<String, Object> res = new HashMap<>();
-    if (metadata.getProducesSortedKeys() != null) {
-      addBoolean(res, PropertyNames.SOURCE_PRODUCES_SORTED_KEYS, metadata.getProducesSortedKeys());
-    }
-    if (metadata.getEstimatedSizeBytes() != null) {
-      addLong(res, PropertyNames.SOURCE_ESTIMATED_SIZE_BYTES, metadata.getEstimatedSizeBytes());
-    }
-    if (metadata.getInfinite() != null) {
-      addBoolean(res, PropertyNames.SOURCE_IS_INFINITE, metadata.getInfinite());
-    }
-    return res;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/package-info.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/package-info.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/package-info.java
deleted file mode 100644
index b6b2ce6..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/package-info.java
+++ /dev/null
@@ -1,20 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-/**
- * Implementation of the {@link com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner}.
- */
-package com.google.cloud.dataflow.sdk.runners.dataflow;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactory.java
deleted file mode 100644
index eaea3ed..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactory.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.runners.inprocess;
-
-import com.google.cloud.dataflow.sdk.io.BoundedSource;
-import com.google.cloud.dataflow.sdk.io.BoundedSource.BoundedReader;
-import com.google.cloud.dataflow.sdk.io.Read.Bounded;
-import com.google.cloud.dataflow.sdk.io.Source.Reader;
-import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
-import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.util.WindowedValue;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-
-import java.io.IOException;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ConcurrentMap;
-
-import javax.annotation.Nullable;
-
-/**
- * A {@link TransformEvaluatorFactory} that produces {@link TransformEvaluator TransformEvaluators}
- * for the {@link Bounded Read.Bounded} primitive {@link PTransform}.
- */
-final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory {
-  /*
-   * An evaluator for a Source is stateful, to ensure data is not read multiple times.
-   * Evaluators are cached here to ensure that the reader is not restarted if the evaluator is
-   * retriggered.
-   */
-  private final ConcurrentMap<EvaluatorKey, Queue<? extends BoundedReadEvaluator<?>>>
-      sourceEvaluators = new ConcurrentHashMap<>();
-
-  @SuppressWarnings({"unchecked", "rawtypes"})
-  @Override
-  public <InputT> TransformEvaluator<InputT> forApplication(
-      AppliedPTransform<?, ?, ?> application,
-      @Nullable CommittedBundle<?> inputBundle,
-      InProcessEvaluationContext evaluationContext)
-      throws IOException {
-    return getTransformEvaluator((AppliedPTransform) application, evaluationContext);
-  }
-
-  private <OutputT> TransformEvaluator<?> getTransformEvaluator(
-      final AppliedPTransform<?, PCollection<OutputT>, Bounded<OutputT>> transform,
-      final InProcessEvaluationContext evaluationContext)
-      throws IOException {
-    BoundedReadEvaluator<?> evaluator =
-        getTransformEvaluatorQueue(transform, evaluationContext).poll();
-    if (evaluator == null) {
-      return EmptyTransformEvaluator.create(transform);
-    }
-    return evaluator;
-  }
-
-  /**
-   * Get the queue of {@link TransformEvaluator TransformEvaluators} that produce elements for the
-   * provided application of {@link Bounded Read.Bounded}, initializing it if required.
-   *
-   * <p>This method is thread-safe, and will only produce new evaluators if no other invocation has
-   * already done so.
-   */
-  @SuppressWarnings("unchecked")
-  private <OutputT> Queue<BoundedReadEvaluator<OutputT>> getTransformEvaluatorQueue(
-      final AppliedPTransform<?, PCollection<OutputT>, Bounded<OutputT>> transform,
-      final InProcessEvaluationContext evaluationContext) {
-    // Key by the application and the context the evaluation is occurring in (which call to
-    // Pipeline#run).
-    EvaluatorKey key = new EvaluatorKey(transform, evaluationContext);
-    Queue<BoundedReadEvaluator<OutputT>> evaluatorQueue =
-        (Queue<BoundedReadEvaluator<OutputT>>) sourceEvaluators.get(key);
-    if (evaluatorQueue == null) {
-      evaluatorQueue = new ConcurrentLinkedQueue<>();
-      if (sourceEvaluators.putIfAbsent(key, evaluatorQueue) == null) {
-        // If no queue existed in the evaluators, add an evaluator to initialize the evaluator
-        // factory for this transform
-        BoundedReadEvaluator<OutputT> evaluator =
-            new BoundedReadEvaluator<OutputT>(transform, evaluationContext);
-        evaluatorQueue.offer(evaluator);
-      } else {
-        // otherwise return the existing Queue that arrived before us
-        evaluatorQueue = (Queue<BoundedReadEvaluator<OutputT>>) sourceEvaluators.get(key);
-      }
-    }
-    return evaluatorQueue;
-  }
-
-  /**
-   * A {@link BoundedReadEvaluator} produces elements from an underlying {@link BoundedSource},
-   * discarding all input elements. Within the call to {@link #finishBundle()}, the evaluator
-   * creates the {@link BoundedReader} and consumes all available input.
-   *
-   * <p>A {@link BoundedReadEvaluator} should only be created once per {@link BoundedSource}, and
-   * each evaluator should only be called once per evaluation of the pipeline. Otherwise, the source
-   * may produce duplicate elements.
-   */
-  private static class BoundedReadEvaluator<OutputT> implements TransformEvaluator<Object> {
-    private final AppliedPTransform<?, PCollection<OutputT>, Bounded<OutputT>> transform;
-    private final InProcessEvaluationContext evaluationContext;
-    private boolean contentsRemaining;
-
-    public BoundedReadEvaluator(
-        AppliedPTransform<?, PCollection<OutputT>, Bounded<OutputT>> transform,
-        InProcessEvaluationContext evaluationContext) {
-      this.transform = transform;
-      this.evaluationContext = evaluationContext;
-    }
-
-    @Override
-    public void processElement(WindowedValue<Object> element) {}
-
-    @Override
-    public InProcessTransformResult finishBundle() throws IOException {
-      try (final Reader<OutputT> reader =
-              transform
-                  .getTransform()
-                  .getSource()
-                  .createReader(evaluationContext.getPipelineOptions());) {
-        contentsRemaining = reader.start();
-        UncommittedBundle<OutputT> output =
-            evaluationContext.createRootBundle(transform.getOutput());
-        while (contentsRemaining) {
-          output.add(
-              WindowedValue.timestampedValueInGlobalWindow(
-                  reader.getCurrent(), reader.getCurrentTimestamp()));
-          contentsRemaining = reader.advance();
-        }
-        reader.close();
-        return StepTransformResult.withHold(transform, BoundedWindow.TIMESTAMP_MAX_VALUE)
-            .addOutput(output)
-            .build();
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/CachedThreadPoolExecutorServiceFactory.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/CachedThreadPoolExecutorServiceFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/CachedThreadPoolExecutorServiceFactory.java
deleted file mode 100644
index 3350d2b..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/CachedThreadPoolExecutorServiceFactory.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Copyright (C) 2016 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.runners.inprocess;
-
-import com.google.cloud.dataflow.sdk.options.DefaultValueFactory;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-/**
- * A {@link ExecutorServiceFactory} that produces cached thread pools via
- * {@link Executors#newCachedThreadPool()}.
- */
-class CachedThreadPoolExecutorServiceFactory
-    implements DefaultValueFactory<ExecutorServiceFactory>, ExecutorServiceFactory {
-  private static final CachedThreadPoolExecutorServiceFactory INSTANCE =
-      new CachedThreadPoolExecutorServiceFactory();
-
-  @Override
-  public ExecutorServiceFactory create(PipelineOptions options) {
-    return INSTANCE;
-  }
-
-  @Override
-  public ExecutorService create() {
-    return Executors.newCachedThreadPool();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/Clock.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/Clock.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/Clock.java
deleted file mode 100644
index 11e6ec1..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/Clock.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.runners.inprocess;
-
-import org.joda.time.Instant;
-
-/**
- * Access to the current time.
- */
-public interface Clock {
-  /**
-   * Returns the current time as an {@link Instant}.
-   */
-  Instant now();
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/CompletionCallback.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/CompletionCallback.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/CompletionCallback.java
deleted file mode 100644
index 2792631..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/CompletionCallback.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Copyright (C) 2016 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.runners.inprocess;
-
-import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-
-/**
- * A callback for completing a bundle of input.
- */
-interface CompletionCallback {
-  /**
-   * Handle a successful result.
-   */
-  void handleResult(CommittedBundle<?> inputBundle, InProcessTransformResult result);
-
-  /**
-   * Handle a result that terminated abnormally due to the provided {@link Throwable}.
-   */
-  void handleThrowable(CommittedBundle<?> inputBundle, Throwable t);
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ConsumerTrackingPipelineVisitor.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ConsumerTrackingPipelineVisitor.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ConsumerTrackingPipelineVisitor.java
deleted file mode 100644
index c602b23..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ConsumerTrackingPipelineVisitor.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/*
- * Copyright (C) 2016 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.runners.inprocess;
-
-import static com.google.common.base.Preconditions.checkState;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.Pipeline.PipelineVisitor;
-import com.google.cloud.dataflow.sdk.runners.PipelineRunner;
-import com.google.cloud.dataflow.sdk.runners.TransformTreeNode;
-import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.values.PCollectionView;
-import com.google.cloud.dataflow.sdk.values.PInput;
-import com.google.cloud.dataflow.sdk.values.PValue;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-/**
- * Tracks the {@link AppliedPTransform AppliedPTransforms} that consume each {@link PValue} in the
- * {@link Pipeline}. This is used to schedule consuming {@link PTransform PTransforms} to consume
- * input after the upstream transform has produced and committed output.
- */
-public class ConsumerTrackingPipelineVisitor implements PipelineVisitor {
-  private Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers = new HashMap<>();
-  private Collection<AppliedPTransform<?, ?, ?>> rootTransforms = new ArrayList<>();
-  private Collection<PCollectionView<?>> views = new ArrayList<>();
-  private Map<AppliedPTransform<?, ?, ?>, String> stepNames = new HashMap<>();
-  private Set<PValue> toFinalize = new HashSet<>();
-  private int numTransforms = 0;
-  private boolean finalized = false;
-
-  @Override
-  public void enterCompositeTransform(TransformTreeNode node) {
-    checkState(
-        !finalized,
-        "Attempting to traverse a pipeline (node %s) with a %s "
-            + "which has already visited a Pipeline and is finalized",
-        node.getFullName(),
-        ConsumerTrackingPipelineVisitor.class.getSimpleName());
-  }
-
-  @Override
-  public void leaveCompositeTransform(TransformTreeNode node) {
-    checkState(
-        !finalized,
-        "Attempting to traverse a pipeline (node %s) with a %s which is already finalized",
-        node.getFullName(),
-        ConsumerTrackingPipelineVisitor.class.getSimpleName());
-    if (node.isRootNode()) {
-      finalized = true;
-    }
-  }
-
-  @Override
-  public void visitTransform(TransformTreeNode node) {
-    toFinalize.removeAll(node.getInput().expand());
-    AppliedPTransform<?, ?, ?> appliedTransform = getAppliedTransform(node);
-    if (node.getInput().expand().isEmpty()) {
-      rootTransforms.add(appliedTransform);
-    } else {
-      for (PValue value : node.getInput().expand()) {
-        valueToConsumers.get(value).add(appliedTransform);
-        stepNames.put(appliedTransform, genStepName());
-      }
-    }
-  }
-
-  private AppliedPTransform<?, ?, ?> getAppliedTransform(TransformTreeNode node) {
-    @SuppressWarnings({"rawtypes", "unchecked"})
-    AppliedPTransform<?, ?, ?> application = AppliedPTransform.of(
-        node.getFullName(), node.getInput(), node.getOutput(), (PTransform) node.getTransform());
-    return application;
-  }
-
-  @Override
-  public void visitValue(PValue value, TransformTreeNode producer) {
-    toFinalize.add(value);
-    for (PValue expandedValue : value.expand()) {
-      valueToConsumers.put(expandedValue, new ArrayList<AppliedPTransform<?, ?, ?>>());
-      if (expandedValue instanceof PCollectionView) {
-        views.add((PCollectionView<?>) expandedValue);
-      }
-      expandedValue.recordAsOutput(getAppliedTransform(producer));
-    }
-    value.recordAsOutput(getAppliedTransform(producer));
-  }
-
-  private String genStepName() {
-    return String.format("s%s", numTransforms++);
-  }
-
-
-  /**
-   * Returns a mapping of each fully-expanded {@link PValue} to each
-   * {@link AppliedPTransform} that consumes it. For each AppliedPTransform in the collection
-   * returned from {@code getValueToCustomers().get(PValue)},
-   * {@code AppliedPTransform#getInput().expand()} will contain the argument {@link PValue}.
-   */
-  public Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> getValueToConsumers() {
-    checkState(
-        finalized,
-        "Can't call getValueToConsumers before the Pipeline has been completely traversed");
-
-    return valueToConsumers;
-  }
-
-  /**
-   * Returns the mapping for each {@link AppliedPTransform} in the {@link Pipeline} to a unique step
-   * name.
-   */
-  public Map<AppliedPTransform<?, ?, ?>, String> getStepNames() {
-    checkState(
-        finalized, "Can't call getStepNames before the Pipeline has been completely traversed");
-
-    return stepNames;
-  }
-
-  /**
-   * Returns the root transforms of the {@link Pipeline}. A root {@link AppliedPTransform} consumes
-   * a {@link PInput} where the {@link PInput#expand()} returns an empty collection.
-   */
-  public Collection<AppliedPTransform<?, ?, ?>> getRootTransforms() {
-    checkState(
-        finalized,
-        "Can't call getRootTransforms before the Pipeline has been completely traversed");
-
-    return rootTransforms;
-  }
-
-  /**
-   * Returns all of the {@link PCollectionView PCollectionViews} contained in the visited
-   * {@link Pipeline}.
-   */
-  public Collection<PCollectionView<?>> getViews() {
-    checkState(finalized, "Can't call getViews before the Pipeline has been completely traversed");
-
-    return views;
-  }
-
-  /**
-   * Returns all of the {@link PValue PValues} that have been produced but not consumed. These
-   * {@link PValue PValues} should be finalized by the {@link PipelineRunner} before the
-   * {@link Pipeline} is executed.
-   */
-  public Set<PValue> getUnfinalizedPValues() {
-    checkState(
-        finalized,
-        "Can't call getUnfinalizedPValues before the Pipeline has been completely traversed");
-
-    return toFinalize;
-  }
-}
-
-

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/EmptyTransformEvaluator.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/EmptyTransformEvaluator.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/EmptyTransformEvaluator.java
deleted file mode 100644
index fc09237..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/EmptyTransformEvaluator.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Copyright (C) 2016 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.runners.inprocess;
-
-import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.util.WindowedValue;
-
-/**
- * A {@link TransformEvaluator} that ignores all input and produces no output. The result of
- * invoking {@link #finishBundle()} on this evaluator is to return an
- * {@link InProcessTransformResult} with no elements and a timestamp hold equal to
- * {@link BoundedWindow#TIMESTAMP_MIN_VALUE}. Because the result contains no elements, this hold
- * will not affect the watermark.
- */
-final class EmptyTransformEvaluator<T> implements TransformEvaluator<T> {
-  public static <T> TransformEvaluator<T> create(AppliedPTransform<?, ?, ?> transform) {
-    return new EmptyTransformEvaluator<T>(transform);
-  }
-
-  private final AppliedPTransform<?, ?, ?> transform;
-
-  private EmptyTransformEvaluator(AppliedPTransform<?, ?, ?> transform) {
-    this.transform = transform;
-  }
-
-  @Override
-  public void processElement(WindowedValue<T> element) throws Exception {}
-
-  @Override
-  public InProcessTransformResult finishBundle() throws Exception {
-    return StepTransformResult.withHold(transform, BoundedWindow.TIMESTAMP_MIN_VALUE)
-        .build();
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/EvaluatorKey.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/EvaluatorKey.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/EvaluatorKey.java
deleted file mode 100644
index 307bc5c..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/EvaluatorKey.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.runners.inprocess;
-
-import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
-
-import java.util.Objects;
-
-/**
- * A (Transform, Pipeline Execution) key for stateful evaluators.
- *
- * Source evaluators are stateful to ensure data is not read multiple times. Evaluators are cached
- * to ensure that the reader is not restarted if the evaluator is retriggered. An
- * {@link EvaluatorKey} is used to ensure that multiple Pipelines can be executed without sharing
- * the same evaluators.
- */
-final class EvaluatorKey {
-  private final AppliedPTransform<?, ?, ?> transform;
-  private final InProcessEvaluationContext context;
-
-  public EvaluatorKey(AppliedPTransform<?, ?, ?> transform, InProcessEvaluationContext context) {
-    this.transform = transform;
-    this.context = context;
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hash(transform, context);
-  }
-
-  @Override
-  public boolean equals(Object other) {
-    if (other == null || !(other instanceof EvaluatorKey)) {
-      return false;
-    }
-    EvaluatorKey that = (EvaluatorKey) other;
-    return Objects.equals(this.transform, that.transform)
-        && Objects.equals(this.context, that.context);
-  }
-}
-
-

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceFactory.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceFactory.java
deleted file mode 100644
index 480bcde..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceFactory.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Copyright (C) 2016 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.runners.inprocess;
-
-import java.util.concurrent.ExecutorService;
-
-/**
- * A factory that creates {@link ExecutorService ExecutorServices}.
- * {@link ExecutorService ExecutorServices} created by this factory should be independent of one
- * another (e.g., if any executor is shut down the remaining executors should continue to process
- * work).
- */
-public interface ExecutorServiceFactory {
-  /**
-   * Create a new {@link ExecutorService}.
-   */
-  ExecutorService create();
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java
deleted file mode 100644
index 68a1b8c..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java
+++ /dev/null
@@ -1,432 +0,0 @@
-/*
- * Copyright (C) 2016 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.runners.inprocess;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.runners.inprocess.InMemoryWatermarkManager.FiredTimers;
-import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
-import com.google.cloud.dataflow.sdk.util.KeyedWorkItem;
-import com.google.cloud.dataflow.sdk.util.KeyedWorkItems;
-import com.google.cloud.dataflow.sdk.util.TimeDomain;
-import com.google.cloud.dataflow.sdk.util.TimerInternals.TimerData;
-import com.google.cloud.dataflow.sdk.util.WindowedValue;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.PValue;
-import com.google.common.base.MoreObjects;
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableList;
-
-import org.joda.time.Instant;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Map;
-import java.util.Queue;
-import java.util.Set;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
-
-import javax.annotation.Nullable;
-
-/**
- * An {@link InProcessExecutor} that uses an underlying {@link ExecutorService} and
- * {@link InProcessEvaluationContext} to execute a {@link Pipeline}.
- */
-final class ExecutorServiceParallelExecutor implements InProcessExecutor {
-  private static final Logger LOG = LoggerFactory.getLogger(ExecutorServiceParallelExecutor.class);
-
-  private final ExecutorService executorService;
-
-  private final Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers;
-  private final Set<PValue> keyedPValues;
-  private final TransformEvaluatorRegistry registry;
-  private final InProcessEvaluationContext evaluationContext;
-
-  private final ConcurrentMap<StepAndKey, TransformExecutorService> currentEvaluations;
-  private final ConcurrentMap<TransformExecutor<?>, Boolean> scheduledExecutors;
-
-  private final Queue<ExecutorUpdate> allUpdates;
-  private final BlockingQueue<VisibleExecutorUpdate> visibleUpdates;
-
-  private final TransformExecutorService parallelExecutorService;
-  private final CompletionCallback defaultCompletionCallback;
-
-  private Collection<AppliedPTransform<?, ?, ?>> rootNodes;
-
-  public static ExecutorServiceParallelExecutor create(
-      ExecutorService executorService,
-      Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers,
-      Set<PValue> keyedPValues,
-      TransformEvaluatorRegistry registry,
-      InProcessEvaluationContext context) {
-    return new ExecutorServiceParallelExecutor(
-        executorService, valueToConsumers, keyedPValues, registry, context);
-  }
-
-  private ExecutorServiceParallelExecutor(
-      ExecutorService executorService,
-      Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers,
-      Set<PValue> keyedPValues,
-      TransformEvaluatorRegistry registry,
-      InProcessEvaluationContext context) {
-    this.executorService = executorService;
-    this.valueToConsumers = valueToConsumers;
-    this.keyedPValues = keyedPValues;
-    this.registry = registry;
-    this.evaluationContext = context;
-
-    currentEvaluations = new ConcurrentHashMap<>();
-    scheduledExecutors = new ConcurrentHashMap<>();
-
-    this.allUpdates = new ConcurrentLinkedQueue<>();
-    this.visibleUpdates = new ArrayBlockingQueue<>(20);
-
-    parallelExecutorService =
-        TransformExecutorServices.parallel(executorService, scheduledExecutors);
-    defaultCompletionCallback = new DefaultCompletionCallback();
-  }
-
-  @Override
-  public void start(Collection<AppliedPTransform<?, ?, ?>> roots) {
-    rootNodes = ImmutableList.copyOf(roots);
-    Runnable monitorRunnable = new MonitorRunnable();
-    executorService.submit(monitorRunnable);
-  }
-
-  @SuppressWarnings("unchecked")
-  public void scheduleConsumption(
-      AppliedPTransform<?, ?, ?> consumer,
-      @Nullable CommittedBundle<?> bundle,
-      CompletionCallback onComplete) {
-    evaluateBundle(consumer, bundle, onComplete);
-  }
-
-  private <T> void evaluateBundle(
-      final AppliedPTransform<?, ?, ?> transform,
-      @Nullable final CommittedBundle<T> bundle,
-      final CompletionCallback onComplete) {
-    TransformExecutorService transformExecutor;
-    if (bundle != null && isKeyed(bundle.getPCollection())) {
-      final StepAndKey stepAndKey =
-          StepAndKey.of(transform, bundle == null ? null : bundle.getKey());
-      transformExecutor = getSerialExecutorService(stepAndKey);
-    } else {
-      transformExecutor = parallelExecutorService;
-    }
-    TransformExecutor<T> callable =
-        TransformExecutor.create(
-            registry, evaluationContext, bundle, transform, onComplete, transformExecutor);
-    transformExecutor.schedule(callable);
-  }
-
-  private boolean isKeyed(PValue pvalue) {
-    return keyedPValues.contains(pvalue);
-  }
-
-  private void scheduleConsumers(CommittedBundle<?> bundle) {
-    for (AppliedPTransform<?, ?, ?> consumer : valueToConsumers.get(bundle.getPCollection())) {
-      scheduleConsumption(consumer, bundle, defaultCompletionCallback);
-    }
-  }
-
-  private TransformExecutorService getSerialExecutorService(StepAndKey stepAndKey) {
-    if (!currentEvaluations.containsKey(stepAndKey)) {
-      currentEvaluations.putIfAbsent(
-          stepAndKey, TransformExecutorServices.serial(executorService, scheduledExecutors));
-    }
-    return currentEvaluations.get(stepAndKey);
-  }
-
-  @Override
-  public void awaitCompletion() throws Throwable {
-    VisibleExecutorUpdate update;
-    do {
-      update = visibleUpdates.take();
-      if (update.throwable.isPresent()) {
-        throw update.throwable.get();
-      }
-    } while (!update.isDone());
-    executorService.shutdown();
-  }
-
-  /**
-   * The default {@link CompletionCallback}. The default completion callback is used to complete
-   * transform evaluations that are triggered due to the arrival of elements from an upstream
-   * transform, or for a source transform.
-   */
-  private class DefaultCompletionCallback implements CompletionCallback {
-    @Override
-    public void handleResult(CommittedBundle<?> inputBundle, InProcessTransformResult result) {
-      Iterable<? extends CommittedBundle<?>> resultBundles =
-          evaluationContext.handleResult(inputBundle, Collections.<TimerData>emptyList(), result);
-      for (CommittedBundle<?> outputBundle : resultBundles) {
-        allUpdates.offer(ExecutorUpdate.fromBundle(outputBundle));
-      }
-    }
-
-    @Override
-    public void handleThrowable(CommittedBundle<?> inputBundle, Throwable t) {
-      allUpdates.offer(ExecutorUpdate.fromThrowable(t));
-    }
-  }
-
-  /**
-   * A {@link CompletionCallback} where the completed bundle was produced to deliver some collection
-   * of {@link TimerData timers}. When the evaluator completes successfully, reports all of the
-   * timers used to create the input to the {@link InProcessEvaluationContext evaluation context}
-   * as part of the result.
-   */
-  private class TimerCompletionCallback implements CompletionCallback {
-    private final Iterable<TimerData> timers;
-
-    private TimerCompletionCallback(Iterable<TimerData> timers) {
-      this.timers = timers;
-    }
-
-    @Override
-    public void handleResult(CommittedBundle<?> inputBundle, InProcessTransformResult result) {
-      Iterable<? extends CommittedBundle<?>> resultBundles =
-          evaluationContext.handleResult(inputBundle, timers, result);
-      for (CommittedBundle<?> outputBundle : resultBundles) {
-        allUpdates.offer(ExecutorUpdate.fromBundle(outputBundle));
-      }
-    }
-
-    @Override
-    public void handleThrowable(CommittedBundle<?> inputBundle, Throwable t) {
-      allUpdates.offer(ExecutorUpdate.fromThrowable(t));
-    }
-  }
-
-  /**
-   * An internal status update on the state of the executor.
-   *
-   * Used to signal when the executor should be shut down (due to an exception).
-   */
-  private static class ExecutorUpdate {
-    private final Optional<? extends CommittedBundle<?>> bundle;
-    private final Optional<? extends Throwable> throwable;
-
-    public static ExecutorUpdate fromBundle(CommittedBundle<?> bundle) {
-      return new ExecutorUpdate(bundle, null);
-    }
-
-    public static ExecutorUpdate fromThrowable(Throwable t) {
-      return new ExecutorUpdate(null, t);
-    }
-
-    private ExecutorUpdate(CommittedBundle<?> producedBundle, Throwable throwable) {
-      this.bundle = Optional.fromNullable(producedBundle);
-      this.throwable = Optional.fromNullable(throwable);
-    }
-
-    public Optional<? extends CommittedBundle<?>> getBundle() {
-      return bundle;
-    }
-
-    public Optional<? extends Throwable> getException() {
-      return throwable;
-    }
-
-    @Override
-    public String toString() {
-      return MoreObjects.toStringHelper(ExecutorUpdate.class)
-          .add("bundle", bundle)
-          .add("exception", throwable)
-          .toString();
-    }
-  }
-
-  /**
-   * An update of interest to the user. Used in {@link #awaitCompletion} to decide whether to
-   * return normally or throw an exception.
-   */
-  private static class VisibleExecutorUpdate {
-    private final Optional<? extends Throwable> throwable;
-    private final boolean done;
-
-    public static VisibleExecutorUpdate fromThrowable(Throwable e) {
-      return new VisibleExecutorUpdate(false, e);
-    }
-
-    public static VisibleExecutorUpdate finished() {
-      return new VisibleExecutorUpdate(true, null);
-    }
-
-    private VisibleExecutorUpdate(boolean done, @Nullable Throwable exception) {
-      this.throwable = Optional.fromNullable(exception);
-      this.done = done;
-    }
-
-    public boolean isDone() {
-      return done;
-    }
-  }
-
-  private class MonitorRunnable implements Runnable {
-    private final String runnableName =
-        String.format(
-            "%s$%s-monitor",
-            evaluationContext.getPipelineOptions().getAppName(),
-            ExecutorServiceParallelExecutor.class.getSimpleName());
-
-    @Override
-    public void run() {
-      String oldName = Thread.currentThread().getName();
-      Thread.currentThread().setName(runnableName);
-      try {
-        ExecutorUpdate update = allUpdates.poll();
-        if (update != null) {
-          LOG.debug("Executor Update: {}", update);
-          if (update.getBundle().isPresent()) {
-            scheduleConsumers(update.getBundle().get());
-          } else if (update.getException().isPresent()) {
-            visibleUpdates.offer(VisibleExecutorUpdate.fromThrowable(update.getException().get()));
-          }
-        }
-        boolean timersFired = fireTimers();
-        addWorkIfNecessary(timersFired);
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        LOG.error("Monitor died due to being interrupted");
-        while (!visibleUpdates.offer(VisibleExecutorUpdate.fromThrowable(e))) {
-          visibleUpdates.poll();
-        }
-      } catch (Throwable t) {
-        LOG.error("Monitor thread died due to throwable", t);
-        while (!visibleUpdates.offer(VisibleExecutorUpdate.fromThrowable(t))) {
-          visibleUpdates.poll();
-        }
-      } finally {
-        if (!shouldShutdown()) {
-          // The monitor thread should always be scheduled; but we only need to be scheduled once
-          executorService.submit(this);
-        }
-        Thread.currentThread().setName(oldName);
-      }
-    }
-
-    /**
-     * Fires any available timers. Returns true if at least one timer was fired.
-     */
-    private boolean fireTimers() throws Exception {
-      try {
-        boolean firedTimers = false;
-        for (Map.Entry<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> transformTimers :
-            evaluationContext.extractFiredTimers().entrySet()) {
-          AppliedPTransform<?, ?, ?> transform = transformTimers.getKey();
-          for (Map.Entry<Object, FiredTimers> keyTimers : transformTimers.getValue().entrySet()) {
-            for (TimeDomain domain : TimeDomain.values()) {
-              Collection<TimerData> delivery = keyTimers.getValue().getTimers(domain);
-              if (delivery.isEmpty()) {
-                continue;
-              }
-              KeyedWorkItem<Object, Object> work =
-                  KeyedWorkItems.timersWorkItem(keyTimers.getKey(), delivery);
-              @SuppressWarnings({"unchecked", "rawtypes"})
-              CommittedBundle<?> bundle =
-                  InProcessBundle.<KeyedWorkItem<Object, Object>>keyed(
-                          (PCollection) transform.getInput(), keyTimers.getKey())
-                      .add(WindowedValue.valueInEmptyWindows(work))
-                      .commit(Instant.now());
-              scheduleConsumption(transform, bundle, new TimerCompletionCallback(delivery));
-              firedTimers = true;
-            }
-          }
-        }
-        return firedTimers;
-      } catch (Exception e) {
-        LOG.error("Internal Error while delivering timers", e);
-        throw e;
-      }
-    }
-
-    private boolean shouldShutdown() {
-      if (evaluationContext.isDone()) {
-        LOG.debug("Pipeline is finished. Shutting down. {}");
-        while (!visibleUpdates.offer(VisibleExecutorUpdate.finished())) {
-          visibleUpdates.poll();
-        }
-        executorService.shutdown();
-        return true;
-      }
-      return false;
-    }
-
-    /**
-     * If all active {@link TransformExecutor TransformExecutors} are in a blocked state,
-     * add more work from root nodes that may have additional work. This ensures that if a pipeline
-     * has elements available from the root nodes it will add those elements when necessary.
-     */
-    private void addWorkIfNecessary(boolean firedTimers) {
-      // If any timers have fired, they will add more work; We don't need to add more
-      if (firedTimers) {
-        return;
-      }
-      for (TransformExecutor<?> executor : scheduledExecutors.keySet()) {
-        if (!isExecutorBlocked(executor)) {
-          // We have at least one executor that can proceed without adding additional work
-          return;
-        }
-      }
-      // All current TransformExecutors are blocked; add more work from the roots.
-      for (AppliedPTransform<?, ?, ?> root : rootNodes) {
-        if (!evaluationContext.isDone(root)) {
-          scheduleConsumption(root, null, defaultCompletionCallback);
-        }
-      }
-    }
-
-    /**
-     * Return true if the provided executor might make more progress if no action is taken.
-     *
-     * <p>May return false even if all executor threads are currently blocked or cleaning up, as
-     * these can cause more work to be scheduled. If this does not occur, after these calls
-     * terminate, future calls will return true if all executors are waiting.
-     */
-    private boolean isExecutorBlocked(TransformExecutor<?> executor) {
-      Thread thread = executor.getThread();
-      if (thread == null) {
-        return false;
-      }
-      switch (thread.getState()) {
-        case TERMINATED:
-          throw new IllegalStateException(String.format(
-              "Unexpectedly encountered a Terminated TransformExecutor %s", executor));
-        case WAITING:
-        case TIMED_WAITING:
-          // The thread is waiting for some external input. Adding more work may cause the thread
-          // to stop waiting (e.g. the thread is waiting on an unbounded side input)
-          return true;
-        case BLOCKED:
-          // The executor is blocked on acquisition of a java monitor. This usually means it is
-          // making a call to the EvaluationContext, but not a model-blocking call - and will
-          // eventually complete, at which point we may reevaluate.
-        default:
-          // NEW and RUNNABLE threads can make progress
-          return false;
-      }
-    }
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/FlattenEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/FlattenEvaluatorFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/FlattenEvaluatorFactory.java
deleted file mode 100644
index ce315be..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/FlattenEvaluatorFactory.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Copyright (C) 2016 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.runners.inprocess;
-
-import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
-import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
-import com.google.cloud.dataflow.sdk.transforms.Flatten;
-import com.google.cloud.dataflow.sdk.transforms.Flatten.FlattenPCollectionList;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.util.WindowedValue;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.PCollectionList;
-
-/**
- * The {@link InProcessPipelineRunner} {@link TransformEvaluatorFactory} for the {@link Flatten}
- * {@link PTransform}.
- */
-class FlattenEvaluatorFactory implements TransformEvaluatorFactory {
-  @Override
-  public <InputT> TransformEvaluator<InputT> forApplication(
-      AppliedPTransform<?, ?, ?> application,
-      CommittedBundle<?> inputBundle,
-      InProcessEvaluationContext evaluationContext) {
-    @SuppressWarnings({"cast", "unchecked", "rawtypes"})
-    TransformEvaluator<InputT> evaluator = (TransformEvaluator<InputT>) createInMemoryEvaluator(
-            (AppliedPTransform) application, inputBundle, evaluationContext);
-    return evaluator;
-  }
-
-  private <InputT> TransformEvaluator<InputT> createInMemoryEvaluator(
-      final AppliedPTransform<
-              PCollectionList<InputT>, PCollection<InputT>, FlattenPCollectionList<InputT>>
-          application,
-      final CommittedBundle<InputT> inputBundle,
-      final InProcessEvaluationContext evaluationContext) {
-    if (inputBundle == null) {
-      // it is impossible to call processElement on a flatten with no input bundle. A Flatten with
-      // no input bundle occurs as an output of Flatten.pcollections(PCollectionList.empty())
-      return new FlattenEvaluator<>(
-          null, StepTransformResult.withoutHold(application).build());
-    }
-    final UncommittedBundle<InputT> outputBundle =
-        evaluationContext.createBundle(inputBundle, application.getOutput());
-    final InProcessTransformResult result =
-        StepTransformResult.withoutHold(application).addOutput(outputBundle).build();
-    return new FlattenEvaluator<>(outputBundle, result);
-  }
-
-  private static class FlattenEvaluator<InputT> implements TransformEvaluator<InputT> {
-    private final UncommittedBundle<InputT> outputBundle;
-    private final InProcessTransformResult result;
-
-    public FlattenEvaluator(
-        UncommittedBundle<InputT> outputBundle, InProcessTransformResult result) {
-      this.outputBundle = outputBundle;
-      this.result = result;
-    }
-
-    @Override
-    public void processElement(WindowedValue<InputT> element) {
-      outputBundle.add(element);
-    }
-
-    @Override
-    public InProcessTransformResult finishBundle() {
-      return result;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ForwardingPTransform.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ForwardingPTransform.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ForwardingPTransform.java
deleted file mode 100644
index b736e35..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ForwardingPTransform.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Copyright (C) 2016 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.runners.inprocess;
-
-import com.google.cloud.dataflow.sdk.coders.CannotProvideCoderException;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.values.PInput;
-import com.google.cloud.dataflow.sdk.values.POutput;
-import com.google.cloud.dataflow.sdk.values.TypedPValue;
-
-/**
- * A base class for implementing {@link PTransform} overrides, which behave identically to the
- * delegate transform but with overridden methods. Implementors are required to implement
- * {@link #delegate()}, which returns the object to forward calls to, and {@link #apply(PInput)}.
- */
-public abstract class ForwardingPTransform<InputT extends PInput, OutputT extends POutput>
-    extends PTransform<InputT, OutputT> {
-  protected abstract PTransform<InputT, OutputT> delegate();
-
-  @Override
-  public OutputT apply(InputT input) {
-    return delegate().apply(input);
-  }
-
-  @Override
-  public void validate(InputT input) {
-    delegate().validate(input);
-  }
-
-  @Override
-  public String getName() {
-    return delegate().getName();
-  }
-
-  @Override
-  public <T> Coder<T> getDefaultOutputCoder(InputT input, @SuppressWarnings("unused")
-      TypedPValue<T> output) throws CannotProvideCoderException {
-    return delegate().getDefaultOutputCoder(input, output);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactory.java
deleted file mode 100644
index 3ec4af1..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactory.java
+++ /dev/null
@@ -1,252 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.runners.inprocess;
-
-import static com.google.cloud.dataflow.sdk.util.CoderUtils.encodeToByteArray;
-
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.CoderException;
-import com.google.cloud.dataflow.sdk.coders.IterableCoder;
-import com.google.cloud.dataflow.sdk.coders.KvCoder;
-import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
-import com.google.cloud.dataflow.sdk.runners.inprocess.StepTransformResult.Builder;
-import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
-import com.google.cloud.dataflow.sdk.transforms.GroupByKey.ReifyTimestampsAndWindows;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.util.GroupAlsoByWindowViaWindowSetDoFn;
-import com.google.cloud.dataflow.sdk.util.KeyedWorkItem;
-import com.google.cloud.dataflow.sdk.util.KeyedWorkItemCoder;
-import com.google.cloud.dataflow.sdk.util.KeyedWorkItems;
-import com.google.cloud.dataflow.sdk.util.SystemReduceFn;
-import com.google.cloud.dataflow.sdk.util.WindowedValue;
-import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.common.annotations.VisibleForTesting;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * The {@link InProcessPipelineRunner} {@link TransformEvaluatorFactory} for the {@link GroupByKey}
- * {@link PTransform}.
- */
-class GroupByKeyEvaluatorFactory implements TransformEvaluatorFactory {
-  @Override
-  public <InputT> TransformEvaluator<InputT> forApplication(
-      AppliedPTransform<?, ?, ?> application,
-      CommittedBundle<?> inputBundle,
-      InProcessEvaluationContext evaluationContext) {
-    @SuppressWarnings({"cast", "unchecked", "rawtypes"})
-    TransformEvaluator<InputT> evaluator = createEvaluator(
-            (AppliedPTransform) application, (CommittedBundle) inputBundle, evaluationContext);
-    return evaluator;
-  }
-
-  private <K, V> TransformEvaluator<KV<K, WindowedValue<V>>> createEvaluator(
-      final AppliedPTransform<
-              PCollection<KV<K, WindowedValue<V>>>, PCollection<KeyedWorkItem<K, V>>,
-              InProcessGroupByKeyOnly<K, V>>
-          application,
-      final CommittedBundle<KV<K, V>> inputBundle,
-      final InProcessEvaluationContext evaluationContext) {
-    return new GroupByKeyEvaluator<K, V>(evaluationContext, inputBundle, application);
-  }
-
-  private static class GroupByKeyEvaluator<K, V>
-      implements TransformEvaluator<KV<K, WindowedValue<V>>> {
-    private final InProcessEvaluationContext evaluationContext;
-
-    private final CommittedBundle<KV<K, V>> inputBundle;
-    private final AppliedPTransform<
-            PCollection<KV<K, WindowedValue<V>>>, PCollection<KeyedWorkItem<K, V>>,
-            InProcessGroupByKeyOnly<K, V>>
-        application;
-    private final Coder<K> keyCoder;
-    private Map<GroupingKey<K>, List<WindowedValue<V>>> groupingMap;
-
-    public GroupByKeyEvaluator(
-        InProcessEvaluationContext evaluationContext,
-        CommittedBundle<KV<K, V>> inputBundle,
-        AppliedPTransform<
-                PCollection<KV<K, WindowedValue<V>>>, PCollection<KeyedWorkItem<K, V>>,
-                InProcessGroupByKeyOnly<K, V>>
-            application) {
-      this.evaluationContext = evaluationContext;
-      this.inputBundle = inputBundle;
-      this.application = application;
-
-      PCollection<KV<K, WindowedValue<V>>> input = application.getInput();
-      keyCoder = getKeyCoder(input.getCoder());
-      groupingMap = new HashMap<>();
-    }
-
-    private Coder<K> getKeyCoder(Coder<KV<K, WindowedValue<V>>> coder) {
-      if (!(coder instanceof KvCoder)) {
-        throw new IllegalStateException();
-      }
-      @SuppressWarnings("unchecked")
-      Coder<K> keyCoder = ((KvCoder<K, WindowedValue<V>>) coder).getKeyCoder();
-      return keyCoder;
-    }
-
-    @Override
-    public void processElement(WindowedValue<KV<K, WindowedValue<V>>> element) {
-      KV<K, WindowedValue<V>> kv = element.getValue();
-      K key = kv.getKey();
-      byte[] encodedKey;
-      try {
-        encodedKey = encodeToByteArray(keyCoder, key);
-      } catch (CoderException exn) {
-        // TODO: Put in better element printing:
-        // truncate if too long.
-        throw new IllegalArgumentException(
-            String.format("unable to encode key %s of input to %s using %s", key, this, keyCoder),
-            exn);
-      }
-      GroupingKey<K> groupingKey = new GroupingKey<>(key, encodedKey);
-      List<WindowedValue<V>> values = groupingMap.get(groupingKey);
-      if (values == null) {
-        values = new ArrayList<WindowedValue<V>>();
-        groupingMap.put(groupingKey, values);
-      }
-      values.add(kv.getValue());
-    }
-
-    @Override
-    public InProcessTransformResult finishBundle() {
-      Builder resultBuilder = StepTransformResult.withoutHold(application);
-      for (Map.Entry<GroupingKey<K>, List<WindowedValue<V>>> groupedEntry :
-          groupingMap.entrySet()) {
-        K key = groupedEntry.getKey().key;
-        KeyedWorkItem<K, V> groupedKv =
-            KeyedWorkItems.elementsWorkItem(key, groupedEntry.getValue());
-        UncommittedBundle<KeyedWorkItem<K, V>> bundle =
-            evaluationContext.createKeyedBundle(inputBundle, key, application.getOutput());
-        bundle.add(WindowedValue.valueInEmptyWindows(groupedKv));
-        resultBuilder.addOutput(bundle);
-      }
-      return resultBuilder.build();
-    }
-
-    private static class GroupingKey<K> {
-      private K key;
-      private byte[] encodedKey;
-
-      public GroupingKey(K key, byte[] encodedKey) {
-        this.key = key;
-        this.encodedKey = encodedKey;
-      }
-
-      @Override
-      public boolean equals(Object o) {
-        if (o instanceof GroupingKey) {
-          GroupingKey<?> that = (GroupingKey<?>) o;
-          return Arrays.equals(this.encodedKey, that.encodedKey);
-        } else {
-          return false;
-        }
-      }
-
-      @Override
-      public int hashCode() {
-        return Arrays.hashCode(encodedKey);
-      }
-    }
-  }
-
-  /**
-   * An in-memory implementation of the {@link GroupByKey} primitive as a composite
-   * {@link PTransform}.
-   */
-  public static final class InProcessGroupByKey<K, V>
-      extends ForwardingPTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> {
-    private final GroupByKey<K, V> original;
-
-    private InProcessGroupByKey(GroupByKey<K, V> from) {
-      this.original = from;
-    }
-
-    @Override
-    public PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> delegate() {
-      return original;
-    }
-
-    @Override
-    public PCollection<KV<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) {
-      KvCoder<K, V> inputCoder = (KvCoder<K, V>) input.getCoder();
-
-      // This operation groups by the combination of key and window,
-      // merging windows as needed, using the windows assigned to the
-      // key/value input elements and the window merge operation of the
-      // window function associated with the input PCollection.
-      WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
-
-      // Use the default GroupAlsoByWindow implementation
-      DoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> groupAlsoByWindow =
-          groupAlsoByWindow(windowingStrategy, inputCoder.getValueCoder());
-
-      // By default, implement GroupByKey via a series of lower-level operations.
-      return input
-          // Make each input element's timestamp and assigned windows
-          // explicit, in the value part.
-          .apply(new ReifyTimestampsAndWindows<K, V>())
-
-          .apply(new InProcessGroupByKeyOnly<K, V>())
-          .setCoder(KeyedWorkItemCoder.of(inputCoder.getKeyCoder(),
-              inputCoder.getValueCoder(), input.getWindowingStrategy().getWindowFn().windowCoder()))
-
-          // Group each key's values by window, merging windows as needed.
-          .apply("GroupAlsoByWindow", ParDo.of(groupAlsoByWindow))
-
-          // And update the windowing strategy as appropriate.
-          .setWindowingStrategyInternal(original.updateWindowingStrategy(windowingStrategy))
-          .setCoder(
-              KvCoder.of(inputCoder.getKeyCoder(), IterableCoder.of(inputCoder.getValueCoder())));
-    }
-
-    private <W extends BoundedWindow>
-        DoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> groupAlsoByWindow(
-            final WindowingStrategy<?, W> windowingStrategy, final Coder<V> inputCoder) {
-      return GroupAlsoByWindowViaWindowSetDoFn.create(
-          windowingStrategy, SystemReduceFn.<K, V, W>buffering(inputCoder));
-    }
-  }
-
-  /**
-   * An implementation primitive to use in the evaluation of a {@link GroupByKey}
-   * {@link PTransform}.
-   */
-  public static final class InProcessGroupByKeyOnly<K, V>
-      extends PTransform<PCollection<KV<K, WindowedValue<V>>>, PCollection<KeyedWorkItem<K, V>>> {
-    @Override
-    public PCollection<KeyedWorkItem<K, V>> apply(PCollection<KV<K, WindowedValue<V>>> input) {
-      return PCollection.<KeyedWorkItem<K, V>>createPrimitiveOutputInternal(
-          input.getPipeline(), input.getWindowingStrategy(), input.isBounded());
-    }
-
-    @VisibleForTesting
-    InProcessGroupByKeyOnly() {}
-  }
-}