You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/03/24 03:47:53 UTC

[29/67] [partial] incubator-beam git commit: Directory reorganization

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorServices.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorServices.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorServices.java
deleted file mode 100644
index 34efdf6..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorServices.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Copyright (C) 2016 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.runners.inprocess;
-
-import com.google.common.base.MoreObjects;
-
-import java.util.Map;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.atomic.AtomicReference;
-
-/**
- * Static factory methods for constructing instances of {@link TransformExecutorService}.
- */
-final class TransformExecutorServices {
-  private TransformExecutorServices() {
-    // Do not instantiate
-  }
-
-  /**
-   * Returns an EvaluationState that evaluates {@link TransformExecutor TransformExecutors} in
-   * parallel.
-   */
-  public static TransformExecutorService parallel(
-      ExecutorService executor, Map<TransformExecutor<?>, Boolean> scheduled) {
-    return new ParallelEvaluationState(executor, scheduled);
-  }
-
-  /**
-   * Returns an EvaluationState that evaluates {@link TransformExecutor TransformExecutors} in
-   * serial.
-   */
-  public static TransformExecutorService serial(
-      ExecutorService executor, Map<TransformExecutor<?>, Boolean> scheduled) {
-    return new SerialEvaluationState(executor, scheduled);
-  }
-
-  /**
-   * A {@link TransformExecutorService} with unlimited parallelism. Any {@link TransformExecutor}
-   * scheduled will be immediately submitted to the {@link ExecutorService}.
-   *
-   * <p>A principal use of this is for the evaluation of an unkeyed Step. Unkeyed computations are
-   * processed in parallel.
-   */
-  private static class ParallelEvaluationState implements TransformExecutorService {
-    private final ExecutorService executor;
-    private final Map<TransformExecutor<?>, Boolean> scheduled;
-
-    private ParallelEvaluationState(
-        ExecutorService executor, Map<TransformExecutor<?>, Boolean> scheduled) {
-      this.executor = executor;
-      this.scheduled = scheduled;
-    }
-
-    @Override
-    public void schedule(TransformExecutor<?> work) {
-      executor.submit(work);
-      scheduled.put(work, true);
-    }
-
-    @Override
-    public void complete(TransformExecutor<?> completed) {
-      scheduled.remove(completed);
-    }
-  }
-
-  /**
-   * A {@link TransformExecutorService} with a single work queue. Any {@link TransformExecutor}
-   * scheduled will be placed on the work queue. Only one item of work will be submitted to the
-   * {@link ExecutorService} at any time.
-   *
-   * <p>A principal use of this is for the serial evaluation of a (Step, Key) pair.
-   * Keyed computations are processed serially per step.
-   */
-  private static class SerialEvaluationState implements TransformExecutorService {
-    private final ExecutorService executor;
-    private final Map<TransformExecutor<?>, Boolean> scheduled;
-
-    private AtomicReference<TransformExecutor<?>> currentlyEvaluating;
-    private final Queue<TransformExecutor<?>> workQueue;
-
-    private SerialEvaluationState(
-        ExecutorService executor, Map<TransformExecutor<?>, Boolean> scheduled) {
-      this.scheduled = scheduled;
-      this.executor = executor;
-      this.currentlyEvaluating = new AtomicReference<>();
-      this.workQueue = new ConcurrentLinkedQueue<>();
-    }
-
-    /**
-     * Schedules the work, adding it to the work queue if there is a bundle currently being
-     * evaluated and scheduling it immediately otherwise.
-     */
-    @Override
-    public void schedule(TransformExecutor<?> work) {
-      workQueue.offer(work);
-      updateCurrentlyEvaluating();
-    }
-
-    @Override
-    public void complete(TransformExecutor<?> completed) {
-      if (!currentlyEvaluating.compareAndSet(completed, null)) {
-        throw new IllegalStateException(
-            "Finished work "
-                + completed
-                + " but could not complete due to unexpected currently executing "
-                + currentlyEvaluating.get());
-      }
-      scheduled.remove(completed);
-      updateCurrentlyEvaluating();
-    }
-
-    private void updateCurrentlyEvaluating() {
-      if (currentlyEvaluating.get() == null) {
-        // Only synchronize if we need to update what's currently evaluating
-        synchronized (this) {
-          TransformExecutor<?> newWork = workQueue.poll();
-          if (newWork != null) {
-            if (currentlyEvaluating.compareAndSet(null, newWork)) {
-              scheduled.put(newWork, true);
-              executor.submit(newWork);
-            } else {
-              workQueue.offer(newWork);
-            }
-          }
-        }
-      }
-    }
-
-    @Override
-    public String toString() {
-      return MoreObjects.toStringHelper(SerialEvaluationState.class)
-          .add("currentlyEvaluating", currentlyEvaluating)
-          .add("workQueue", workQueue)
-          .toString();
-    }
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactory.java
deleted file mode 100644
index 549afab..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactory.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.runners.inprocess;
-
-import com.google.cloud.dataflow.sdk.io.Read.Unbounded;
-import com.google.cloud.dataflow.sdk.io.UnboundedSource;
-import com.google.cloud.dataflow.sdk.io.UnboundedSource.CheckpointMark;
-import com.google.cloud.dataflow.sdk.io.UnboundedSource.UnboundedReader;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
-import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.util.WindowedValue;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-
-import java.io.IOException;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ConcurrentMap;
-
-import javax.annotation.Nullable;
-
-/**
- * A {@link TransformEvaluatorFactory} that produces {@link TransformEvaluator TransformEvaluators}
- * for the {@link Unbounded Read.Unbounded} primitive {@link PTransform}.
- */
-class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory {
-  /*
-   * An evaluator for a Source is stateful, to ensure the CheckpointMark is properly persisted.
-   * Evaluators are cached here to ensure that the checkpoint mark is appropriately reused
-   * and any splits are honored.
-   */
-  private final ConcurrentMap<EvaluatorKey, Queue<? extends UnboundedReadEvaluator<?>>>
-      sourceEvaluators = new ConcurrentHashMap<>();
-
-  @SuppressWarnings({"unchecked", "rawtypes"})
-  @Override
-  public <InputT> TransformEvaluator<InputT> forApplication(AppliedPTransform<?, ?, ?> application,
-      @Nullable CommittedBundle<?> inputBundle, InProcessEvaluationContext evaluationContext) {
-    return getTransformEvaluator((AppliedPTransform) application, evaluationContext);
-  }
-
-  private <OutputT> TransformEvaluator<?> getTransformEvaluator(
-      final AppliedPTransform<?, PCollection<OutputT>, Unbounded<OutputT>> transform,
-      final InProcessEvaluationContext evaluationContext) {
-    UnboundedReadEvaluator<?> currentEvaluator =
-        getTransformEvaluatorQueue(transform, evaluationContext).poll();
-    if (currentEvaluator == null) {
-      return EmptyTransformEvaluator.create(transform);
-    }
-    return currentEvaluator;
-  }
-
-  /**
-   * Get the queue of {@link TransformEvaluator TransformEvaluators} that produce elements for the
-   * provided application of {@link Unbounded Read.Unbounded}, initializing it if required.
-   *
-   * <p>This method is thread-safe, and will only produce new evaluators if no other invocation has
-   * already done so.
-   */
-  @SuppressWarnings("unchecked")
-  private <OutputT> Queue<UnboundedReadEvaluator<OutputT>> getTransformEvaluatorQueue(
-      final AppliedPTransform<?, PCollection<OutputT>, Unbounded<OutputT>> transform,
-      final InProcessEvaluationContext evaluationContext) {
-    // Key by the application and the context the evaluation is occurring in (which call to
-    // Pipeline#run).
-    EvaluatorKey key = new EvaluatorKey(transform, evaluationContext);
-    @SuppressWarnings("unchecked")
-    Queue<UnboundedReadEvaluator<OutputT>> evaluatorQueue =
-        (Queue<UnboundedReadEvaluator<OutputT>>) sourceEvaluators.get(key);
-    if (evaluatorQueue == null) {
-      evaluatorQueue = new ConcurrentLinkedQueue<>();
-      if (sourceEvaluators.putIfAbsent(key, evaluatorQueue) == null) {
-        // If no queue existed in the evaluators, add an evaluator to initialize the evaluator
-        // factory for this transform
-        UnboundedReadEvaluator<OutputT> evaluator =
-            new UnboundedReadEvaluator<OutputT>(transform, evaluationContext, evaluatorQueue);
-        evaluatorQueue.offer(evaluator);
-      } else {
-        // otherwise return the existing Queue that arrived before us
-        evaluatorQueue = (Queue<UnboundedReadEvaluator<OutputT>>) sourceEvaluators.get(key);
-      }
-    }
-    return evaluatorQueue;
-  }
-
-  /**
-   * A {@link UnboundedReadEvaluator} produces elements from an underlying {@link UnboundedSource},
-   * discarding all input elements. Within the call to {@link #finishBundle()}, the evaluator
-   * creates the {@link UnboundedReader} and consumes some currently available input.
-   *
-   * <p>Calls to {@link UnboundedReadEvaluator} are not internally thread-safe, and should only be
-   * used by a single thread at a time. Each {@link UnboundedReadEvaluator} maintains its own
-   * checkpoint, and constructs its reader from the current checkpoint in each call to
-   * {@link #finishBundle()}.
-   */
-  private static class UnboundedReadEvaluator<OutputT> implements TransformEvaluator<Object> {
-    private static final int ARBITRARY_MAX_ELEMENTS = 10;
-    private final AppliedPTransform<?, PCollection<OutputT>, Unbounded<OutputT>> transform;
-    private final InProcessEvaluationContext evaluationContext;
-    private final Queue<UnboundedReadEvaluator<OutputT>> evaluatorQueue;
-    private CheckpointMark checkpointMark;
-
-    public UnboundedReadEvaluator(
-        AppliedPTransform<?, PCollection<OutputT>, Unbounded<OutputT>> transform,
-        InProcessEvaluationContext evaluationContext,
-        Queue<UnboundedReadEvaluator<OutputT>> evaluatorQueue) {
-      this.transform = transform;
-      this.evaluationContext = evaluationContext;
-      this.evaluatorQueue = evaluatorQueue;
-      this.checkpointMark = null;
-    }
-
-    @Override
-    public void processElement(WindowedValue<Object> element) {}
-
-    @Override
-    public InProcessTransformResult finishBundle() throws IOException {
-      UncommittedBundle<OutputT> output = evaluationContext.createRootBundle(transform.getOutput());
-      try (UnboundedReader<OutputT> reader =
-              createReader(
-                  transform.getTransform().getSource(), evaluationContext.getPipelineOptions());) {
-        int numElements = 0;
-        if (reader.start()) {
-          do {
-            output.add(
-                WindowedValue.timestampedValueInGlobalWindow(
-                    reader.getCurrent(), reader.getCurrentTimestamp()));
-            numElements++;
-          } while (numElements < ARBITRARY_MAX_ELEMENTS && reader.advance());
-        }
-        checkpointMark = reader.getCheckpointMark();
-        checkpointMark.finalizeCheckpoint();
-        // TODO: When exercising create initial splits, make this the minimum watermark across all
-        // existing readers
-        StepTransformResult result =
-            StepTransformResult.withHold(transform, reader.getWatermark())
-                .addOutput(output)
-                .build();
-        evaluatorQueue.offer(this);
-        return result;
-      }
-    }
-
-    private <CheckpointMarkT extends CheckpointMark> UnboundedReader<OutputT> createReader(
-        UnboundedSource<OutputT, CheckpointMarkT> source, PipelineOptions options) {
-      @SuppressWarnings("unchecked")
-      CheckpointMarkT mark = (CheckpointMarkT) checkpointMark;
-      return source.createReader(options, mark);
-    }
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ViewEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ViewEvaluatorFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ViewEvaluatorFactory.java
deleted file mode 100644
index dd2bfb1..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ViewEvaluatorFactory.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.runners.inprocess;
-
-import com.google.cloud.dataflow.sdk.coders.KvCoder;
-import com.google.cloud.dataflow.sdk.coders.VoidCoder;
-import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.PCollectionViewWriter;
-import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
-import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.transforms.Values;
-import com.google.cloud.dataflow.sdk.transforms.View.CreatePCollectionView;
-import com.google.cloud.dataflow.sdk.transforms.WithKeys;
-import com.google.cloud.dataflow.sdk.util.WindowedValue;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.PCollectionView;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * The {@link InProcessPipelineRunner} {@link TransformEvaluatorFactory} for the
- * {@link CreatePCollectionView} primitive {@link PTransform}.
- *
- * <p>The {@link ViewEvaluatorFactory} produces {@link TransformEvaluator TransformEvaluators} for
- * the {@link WriteView} {@link PTransform}, which is part of the
- * {@link InProcessCreatePCollectionView} composite transform. This transform is an override for the
- * {@link CreatePCollectionView} transform that applies windowing and triggers before the view is
- * written.
- */
-class ViewEvaluatorFactory implements TransformEvaluatorFactory {
-  @Override
-  public <T> TransformEvaluator<T> forApplication(
-      AppliedPTransform<?, ?, ?> application,
-      InProcessPipelineRunner.CommittedBundle<?> inputBundle,
-      InProcessEvaluationContext evaluationContext) {
-    @SuppressWarnings({"cast", "unchecked", "rawtypes"})
-    TransformEvaluator<T> evaluator = (TransformEvaluator<T>) createEvaluator(
-            (AppliedPTransform) application, evaluationContext);
-    return evaluator;
-  }
-
-  private <InT, OuT> TransformEvaluator<Iterable<InT>> createEvaluator(
-      final AppliedPTransform<PCollection<Iterable<InT>>, PCollectionView<OuT>, WriteView<InT, OuT>>
-          application,
-      InProcessEvaluationContext context) {
-    PCollection<Iterable<InT>> input = application.getInput();
-    final PCollectionViewWriter<InT, OuT> writer =
-        context.createPCollectionViewWriter(input, application.getOutput());
-    return new TransformEvaluator<Iterable<InT>>() {
-      private final List<WindowedValue<InT>> elements = new ArrayList<>();
-
-      @Override
-      public void processElement(WindowedValue<Iterable<InT>> element) {
-        for (InT input : element.getValue()) {
-          elements.add(element.withValue(input));
-        }
-      }
-
-      @Override
-      public InProcessTransformResult finishBundle() {
-        writer.add(elements);
-        return StepTransformResult.withoutHold(application).build();
-      }
-    };
-  }
-
-  /**
-   * An in-process override for {@link CreatePCollectionView}.
-   */
-  public static class InProcessCreatePCollectionView<ElemT, ViewT>
-      extends PTransform<PCollection<ElemT>, PCollectionView<ViewT>> {
-    private final CreatePCollectionView<ElemT, ViewT> og;
-
-    private InProcessCreatePCollectionView(CreatePCollectionView<ElemT, ViewT> og) {
-      this.og = og;
-    }
-
-    @Override
-    public PCollectionView<ViewT> apply(PCollection<ElemT> input) {
-      return input.apply(WithKeys.<Void, ElemT>of((Void) null))
-          .setCoder(KvCoder.of(VoidCoder.of(), input.getCoder()))
-          .apply(GroupByKey.<Void, ElemT>create())
-          .apply(Values.<Iterable<ElemT>>create())
-          .apply(new WriteView<ElemT, ViewT>(og));
-    }
-  }
-
-  /**
-   * An in-process implementation of the {@link CreatePCollectionView} primitive.
-   *
-   * This implementation requires the input {@link PCollection} to be an iterable, which is provided
-   * to {@link PCollectionView#fromIterableInternal(Iterable)}.
-   */
-  public static final class WriteView<ElemT, ViewT>
-      extends PTransform<PCollection<Iterable<ElemT>>, PCollectionView<ViewT>> {
-    private final CreatePCollectionView<ElemT, ViewT> og;
-
-    WriteView(CreatePCollectionView<ElemT, ViewT> og) {
-      this.og = og;
-    }
-
-    @Override
-    public PCollectionView<ViewT> apply(PCollection<Iterable<ElemT>> input) {
-      return og.getView();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/WatermarkCallbackExecutor.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/WatermarkCallbackExecutor.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/WatermarkCallbackExecutor.java
deleted file mode 100644
index 27d59b9..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/WatermarkCallbackExecutor.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Copyright (C) 2016 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.runners.inprocess;
-
-import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
-import com.google.common.collect.ComparisonChain;
-import com.google.common.collect.Ordering;
-
-import org.joda.time.Instant;
-
-import java.util.PriorityQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-/**
- * Executes callbacks that occur based on the progression of the watermark per-step.
- *
- * <p>Callbacks are registered by calls to
- * {@link #callOnGuaranteedFiring(AppliedPTransform, BoundedWindow, WindowingStrategy, Runnable)},
- * and are executed after a call to {@link #fireForWatermark(AppliedPTransform, Instant)} with the
- * same {@link AppliedPTransform} and a watermark sufficient to ensure that the trigger for the
- * windowing strategy would have been produced.
- *
- * <p>NOTE: {@link WatermarkCallbackExecutor} does not track the latest observed watermark for any
- * {@link AppliedPTransform} - any call to
- * {@link #callOnGuaranteedFiring(AppliedPTransform, BoundedWindow, WindowingStrategy, Runnable)}
- * that could have potentially already fired should be followed by a call to
- * {@link #fireForWatermark(AppliedPTransform, Instant)} for the same transform with the current
- * value of the watermark.
- */
-class WatermarkCallbackExecutor {
-  /**
-   * Create a new {@link WatermarkCallbackExecutor}.
-   */
-  public static WatermarkCallbackExecutor create() {
-    return new WatermarkCallbackExecutor();
-  }
-
-  private final ConcurrentMap<AppliedPTransform<?, ?, ?>, PriorityQueue<WatermarkCallback>>
-      callbacks;
-  private final ExecutorService executor;
-
-  private WatermarkCallbackExecutor() {
-    this.callbacks = new ConcurrentHashMap<>();
-    this.executor = Executors.newSingleThreadExecutor();
-  }
-
-  /**
-   * Execute the provided {@link Runnable} after the next call to
-   * {@link #fireForWatermark(AppliedPTransform, Instant)} where the window is guaranteed to have
-   * produced output.
-   */
-  public void callOnGuaranteedFiring(
-      AppliedPTransform<?, ?, ?> step,
-      BoundedWindow window,
-      WindowingStrategy<?, ?> windowingStrategy,
-      Runnable runnable) {
-    WatermarkCallback callback =
-        WatermarkCallback.onGuaranteedFiring(window, windowingStrategy, runnable);
-
-    PriorityQueue<WatermarkCallback> callbackQueue = callbacks.get(step);
-    if (callbackQueue == null) {
-      callbackQueue = new PriorityQueue<>(11, new CallbackOrdering());
-      if (callbacks.putIfAbsent(step, callbackQueue) != null) {
-        callbackQueue = callbacks.get(step);
-      }
-    }
-
-    synchronized (callbackQueue) {
-      callbackQueue.offer(callback);
-    }
-  }
-
-  /**
-   * Schedule all pending callbacks that must have produced output by the time of the provided
-   * watermark.
-   */
-  public void fireForWatermark(AppliedPTransform<?, ?, ?> step, Instant watermark) {
-    PriorityQueue<WatermarkCallback> callbackQueue = callbacks.get(step);
-    if (callbackQueue == null) {
-      return;
-    }
-    synchronized (callbackQueue) {
-      while (!callbackQueue.isEmpty() && callbackQueue.peek().shouldFire(watermark)) {
-        executor.submit(callbackQueue.poll().getCallback());
-      }
-    }
-  }
-
-  private static class WatermarkCallback {
-    public static <W extends BoundedWindow> WatermarkCallback onGuaranteedFiring(
-        BoundedWindow window, WindowingStrategy<?, W> strategy, Runnable callback) {
-      @SuppressWarnings("unchecked")
-      Instant firingAfter =
-          strategy.getTrigger().getSpec().getWatermarkThatGuaranteesFiring((W) window);
-      return new WatermarkCallback(firingAfter, callback);
-    }
-
-    private final Instant fireAfter;
-    private final Runnable callback;
-
-    private WatermarkCallback(Instant fireAfter, Runnable callback) {
-      this.fireAfter = fireAfter;
-      this.callback = callback;
-    }
-
-    public boolean shouldFire(Instant currentWatermark) {
-      return currentWatermark.isAfter(fireAfter)
-          || currentWatermark.equals(BoundedWindow.TIMESTAMP_MAX_VALUE);
-    }
-
-    public Runnable getCallback() {
-      return callback;
-    }
-  }
-
-  private static class CallbackOrdering extends Ordering<WatermarkCallback> {
-    @Override
-    public int compare(WatermarkCallback left, WatermarkCallback right) {
-      return ComparisonChain.start()
-          .compare(left.fireAfter, right.fireAfter)
-          .compare(left.callback, right.callback, Ordering.arbitrary())
-          .result();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/package-info.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/package-info.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/package-info.java
deleted file mode 100644
index d1aa6af..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/package-info.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-/**
- * Defines runners for executing Pipelines in different modes, including
- * {@link com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner} and
- * {@link com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner}.
- *
- * <p>{@link com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner} executes a {@code Pipeline}
- * locally, without contacting the Dataflow service.
- * {@link com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner} submits a
- * {@code Pipeline} to the Dataflow service, which executes it on Dataflow-managed Compute Engine
- * instances. {@code DataflowPipelineRunner} returns
- * as soon as the {@code Pipeline} has been submitted. Use
- * {@link com.google.cloud.dataflow.sdk.runners.BlockingDataflowPipelineRunner} to have execution
- * updates printed to the console.
- *
- * <p>The runner is specified as part {@link com.google.cloud.dataflow.sdk.options.PipelineOptions}.
- */
-package com.google.cloud.dataflow.sdk.runners;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/IsmFormat.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/IsmFormat.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/IsmFormat.java
deleted file mode 100644
index 318de9b..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/IsmFormat.java
+++ /dev/null
@@ -1,946 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.runners.worker;
-
-import static com.google.cloud.dataflow.sdk.util.Structs.addLong;
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-
-import com.google.cloud.dataflow.sdk.coders.AtomicCoder;
-import com.google.cloud.dataflow.sdk.coders.ByteArrayCoder;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.Coder.NonDeterministicException;
-import com.google.cloud.dataflow.sdk.coders.CoderException;
-import com.google.cloud.dataflow.sdk.coders.ListCoder;
-import com.google.cloud.dataflow.sdk.coders.StandardCoder;
-import com.google.cloud.dataflow.sdk.coders.VarIntCoder;
-import com.google.cloud.dataflow.sdk.coders.VarLongCoder;
-import com.google.cloud.dataflow.sdk.util.CloudObject;
-import com.google.cloud.dataflow.sdk.util.PropertyNames;
-import com.google.cloud.dataflow.sdk.util.RandomAccessData;
-import com.google.cloud.dataflow.sdk.util.VarInt;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.common.base.MoreObjects;
-import com.google.common.base.MoreObjects.ToStringHelper;
-import com.google.common.base.Objects;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-import com.google.common.hash.HashFunction;
-import com.google.common.hash.Hashing;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import javax.annotation.Nullable;
-
-/**
- * An Ism file is a prefix encoded composite key value file broken into shards. Each composite
- * key is composed of a fixed number of component keys. A fixed number of those sub keys represent
- * the shard key portion; see {@link IsmRecord} and {@link IsmRecordCoder} for further details
- * around the data format. In addition to the data, there is a bloom filter,
- * and multiple indices to allow for efficient retrieval.
- *
- * <p>An Ism file is composed of these high level sections (in order):
- * <ul>
- *   <li>shard block</li>
- *   <li>bloom filter (See {@code ScalableBloomFilter} for details on encoding format)</li>
- *   <li>shard index</li>
- *   <li>footer (See {@link Footer} for details on encoding format)</li>
- * </ul>
- *
- * <p>The shard block is composed of multiple copies of the following:
- * <ul>
- *   <li>data block</li>
- *   <li>data index</li>
- * </ul>
- *
- * <p>The data block is composed of multiple copies of the following:
- * <ul>
- *   <li>key prefix (See {@link KeyPrefix} for details on encoding format)</li>
- *   <li>unshared key bytes</li>
- *   <li>value bytes</li>
- *   <li>optional 0x00 0x00 bytes followed by metadata bytes
- *       (if the following 0x00 0x00 bytes are not present, then there are no metadata bytes)</li>
- * </ul>
- * Each key written into the data block must be in unsigned lexicographically increasing order
- * and also its shard portion of the key must hash to the same shard id as all other keys
- * within the same data block. The hashing function used is the
- * <a href="http://smhasher.googlecode.com/svn/trunk/MurmurHash3.cpp">
- * 32-bit murmur3 algorithm, x86 variant</a> (little-endian variant),
- * using {@code 1225801234} as the seed value.
- *
- * <p>The data index is composed of {@code N} copies of the following:
- * <ul>
- *   <li>key prefix (See {@link KeyPrefix} for details on encoding format)</li>
- *   <li>unshared key bytes</li>
- *   <li>byte offset to key prefix in data block (variable length long coding)</li>
- * </ul>
- *
- * <p>The shard index is composed of a {@link VarInt variable length integer} encoding representing
- * the number of shard index records followed by that many shard index records.
- * See {@link IsmShardCoder} for further details as to its encoding scheme.
- */
-public class IsmFormat {
-  private static final int HASH_SEED = 1225801234;
-  private static final HashFunction HASH_FUNCTION = Hashing.murmur3_32(HASH_SEED);
-  static final int SHARD_BITS = 0x7F; // [0-127] shards + [128-255] metadata shards
-
-  /**
-   * A record containing a composite key and either a value or metadata. The composite key
-   * must not contain the metadata key component place holder if producing a value record, and must
-   * contain the metadata component key place holder if producing a metadata record.
-   *
-   * <p>The composite key is a fixed number of component keys where the first {@code N} component
-   * keys are used to create a shard id via hashing. See {@link IsmRecordCoder#hash(List)} for
-   * further details.
-   */
-  public static class IsmRecord<V> {
-    /** Returns an IsmRecord with the specified key components and value. */
-    public static <V> IsmRecord<V> of(List<?> keyComponents, V value) {
-      checkNotNull(keyComponents);
-      checkArgument(!keyComponents.isEmpty(), "Expected non-empty list of key components.");
-      checkArgument(!isMetadataKey(keyComponents),
-          "Expected key components to not contain metadata key.");
-      return new IsmRecord<>(keyComponents, value, null);
-    }
-
-    public static <V> IsmRecord<V> meta(List<?> keyComponents, byte[] metadata) {
-      checkNotNull(keyComponents);
-      checkNotNull(metadata);
-      checkArgument(!keyComponents.isEmpty(), "Expected non-empty list of key components.");
-      checkArgument(isMetadataKey(keyComponents),
-          "Expected key components to contain metadata key.");
-      return new IsmRecord<V>(keyComponents, null, metadata);
-    }
-
-    private final List<?> keyComponents;
-    @Nullable
-    private final V value;
-    @Nullable
-    private final byte[] metadata;
-    private IsmRecord(List<?> keyComponents, V value, byte[] metadata) {
-      this.keyComponents = keyComponents;
-      this.value = value;
-      this.metadata = metadata;
-    }
-
-    /** Returns the list of key components. */
-    public List<?> getKeyComponents() {
-      return keyComponents;
-    }
-
-    /** Returns the key component at the specified index. */
-    public Object getKeyComponent(int index) {
-      return keyComponents.get(index);
-    }
-
-    /**
-     * Returns the value. Throws {@link IllegalStateException} if this is not a
-     * value record.
-     */
-    public V getValue() {
-      checkState(!isMetadataKey(keyComponents),
-          "This is a metadata record and not a value record.");
-      return value;
-    }
-
-    /**
-     * Returns the metadata. Throws {@link IllegalStateException} if this is not a
-     * metadata record.
-     */
-    public byte[] getMetadata() {
-      checkState(isMetadataKey(keyComponents),
-          "This is a value record and not a metadata record.");
-      return metadata;
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-      if (!(obj instanceof IsmRecord)) {
-        return false;
-      }
-      IsmRecord<?> other = (IsmRecord<?>) obj;
-      return Objects.equal(keyComponents, other.keyComponents)
-          && Objects.equal(value, other.value)
-          && Arrays.equals(metadata, other.metadata);
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hashCode(keyComponents, value, Arrays.hashCode(metadata));
-    }
-
-    @Override
-    public String toString() {
-      ToStringHelper builder = MoreObjects.toStringHelper(IsmRecord.class)
-          .add("keyComponents", keyComponents);
-      if (isMetadataKey(keyComponents)) {
-        builder.add("metadata", metadata);
-      } else {
-        builder.add("value", value);
-      }
-      return builder.toString();
-    }
-  }
-
-  /** A {@link Coder} for {@link IsmRecord}s.
-   *
-   * <p>Note that this coder standalone will not produce an Ism file. This coder can be used
-   * to materialize a {@link PCollection} of {@link IsmRecord}s. Only when this coder
-   * is combined with an {@link IsmSink} will one produce an Ism file.
-   *
-   * <p>The {@link IsmRecord} encoded format is:
-   * <ul>
-   *   <li>encoded key component 1 using key component coder 1</li>
-   *   <li>...</li>
-   *   <li>encoded key component N using key component coder N</li>
-   *   <li>encoded value using value coder</li>
-   * </ul>
-   */
-  public static class IsmRecordCoder<V>
-      extends StandardCoder<IsmRecord<V>> {
-    /** Returns an IsmRecordCoder with the specified key component coders, value coder. */
-    public static <V> IsmRecordCoder<V> of(
-        int numberOfShardKeyCoders,
-        int numberOfMetadataShardKeyCoders,
-        List<Coder<?>> keyComponentCoders,
-        Coder<V> valueCoder) {
-      checkNotNull(keyComponentCoders);
-      checkArgument(keyComponentCoders.size() > 0);
-      checkArgument(numberOfShardKeyCoders > 0);
-      checkArgument(numberOfShardKeyCoders <= keyComponentCoders.size());
-      checkArgument(numberOfMetadataShardKeyCoders <= keyComponentCoders.size());
-      return new IsmRecordCoder<>(
-          numberOfShardKeyCoders,
-          numberOfMetadataShardKeyCoders,
-          keyComponentCoders,
-          valueCoder);
-    }
-
-    /**
-     * Returns an IsmRecordCoder with the specified coders. Note that this method is not meant
-     * to be called by users but used by Jackson when decoding this coder.
-     */
-    @JsonCreator
-    public static IsmRecordCoder<?> of(
-        @JsonProperty(PropertyNames.NUM_SHARD_CODERS) int numberOfShardCoders,
-        @JsonProperty(PropertyNames.NUM_METADATA_SHARD_CODERS) int numberOfMetadataShardCoders,
-        @JsonProperty(PropertyNames.COMPONENT_ENCODINGS) List<Coder<?>> components) {
-      Preconditions.checkArgument(components.size() >= 2,
-          "Expecting at least 2 components, got " + components.size());
-      return of(
-          numberOfShardCoders,
-          numberOfMetadataShardCoders,
-          components.subList(0, components.size() - 1),
-          components.get(components.size() - 1));
-    }
-
-    private final int numberOfShardKeyCoders;
-    private final int numberOfMetadataShardKeyCoders;
-    private final List<Coder<?>> keyComponentCoders;
-    private final Coder<V> valueCoder;
-
-    private IsmRecordCoder(
-        int numberOfShardKeyCoders,
-        int numberOfMetadataShardKeyCoders,
-        List<Coder<?>> keyComponentCoders, Coder<V> valueCoder) {
-      this.numberOfShardKeyCoders = numberOfShardKeyCoders;
-      this.numberOfMetadataShardKeyCoders = numberOfMetadataShardKeyCoders;
-      this.keyComponentCoders = keyComponentCoders;
-      this.valueCoder = valueCoder;
-    }
-
-    /** Returns the list of key component coders. */
-    public List<Coder<?>> getKeyComponentCoders() {
-      return keyComponentCoders;
-    }
-
-    /** Returns the key coder at the specified index. */
-    public Coder getKeyComponentCoder(int index) {
-      return keyComponentCoders.get(index);
-    }
-
-    /** Returns the value coder. */
-    public Coder<V> getValueCoder() {
-      return valueCoder;
-    }
-
-    @Override
-    public void encode(IsmRecord<V> value, OutputStream outStream,
-        Coder.Context context) throws CoderException, IOException {
-      if (value.getKeyComponents().size() != keyComponentCoders.size()) {
-        throw new CoderException(String.format(
-            "Expected %s key component(s) but received key component(s) %s.",
-            keyComponentCoders.size(), value.getKeyComponents()));
-      }
-      for (int i = 0; i < keyComponentCoders.size(); ++i) {
-        getKeyComponentCoder(i).encode(value.getKeyComponent(i), outStream, context.nested());
-      }
-      if (isMetadataKey(value.getKeyComponents())) {
-        ByteArrayCoder.of().encode(value.getMetadata(), outStream, context.nested());
-      } else {
-        valueCoder.encode(value.getValue(), outStream, context.nested());
-      }
-    }
-
-    @Override
-    public IsmRecord<V> decode(InputStream inStream, Coder.Context context)
-        throws CoderException, IOException {
-      List<Object> keyComponents = new ArrayList<>(keyComponentCoders.size());
-      for (Coder<?> keyCoder : keyComponentCoders) {
-        keyComponents.add(keyCoder.decode(inStream, context.nested()));
-      }
-      if (isMetadataKey(keyComponents)) {
-        return IsmRecord.<V>meta(
-            keyComponents, ByteArrayCoder.of().decode(inStream, context.nested()));
-      } else {
-        return IsmRecord.<V>of(keyComponents, valueCoder.decode(inStream, context.nested()));
-      }
-    }
-
-    int getNumberOfShardKeyCoders(List<?> keyComponents) {
-      if (isMetadataKey(keyComponents)) {
-        return numberOfMetadataShardKeyCoders;
-      } else {
-        return numberOfShardKeyCoders;
-      }
-    }
-
-    /**
-     * Computes the shard id for the given key component(s).
-     *
-     * The shard keys are encoded into their byte representations and hashed using the
-     * <a href="http://smhasher.googlecode.com/svn/trunk/MurmurHash3.cpp">
-     * 32-bit murmur3 algorithm, x86 variant</a> (little-endian variant),
-     * using {@code 1225801234} as the seed value. We ensure that shard ids for
-     * metadata keys and normal keys do not overlap.
-     */
-    public <V, T> int hash(List<?> keyComponents) {
-      return encodeAndHash(keyComponents, new RandomAccessData(), new ArrayList<Integer>());
-    }
-
-    /**
-     * Computes the shard id for the given key component(s).
-     *
-     * Mutates {@code keyBytes} such that when returned, contains the encoded
-     * version of the key components.
-     */
-    <V, T> int encodeAndHash(List<?> keyComponents, RandomAccessData keyBytesToMutate) {
-      return encodeAndHash(keyComponents, keyBytesToMutate, new ArrayList<Integer>());
-    }
-
-    /**
-     * Computes the shard id for the given key component(s).
-     *
-     * Mutates {@code keyBytes} such that when returned, contains the encoded
-     * version of the key components. Also, mutates {@code keyComponentByteOffsetsToMutate} to
-     * store the location where each key component's encoded byte representation ends within
-     * {@code keyBytes}.
-     */
-    <V, T> int encodeAndHash(
-        List<?> keyComponents,
-        RandomAccessData keyBytesToMutate,
-        List<Integer> keyComponentByteOffsetsToMutate) {
-      checkNotNull(keyComponents);
-      checkArgument(keyComponents.size() <= keyComponentCoders.size(),
-          "Expected at most %s key component(s) but received %s.",
-          keyComponentCoders.size(), keyComponents);
-
-      final int numberOfKeyCodersToUse;
-      final int shardOffset;
-      if (isMetadataKey(keyComponents)) {
-        numberOfKeyCodersToUse = numberOfMetadataShardKeyCoders;
-        shardOffset = SHARD_BITS + 1;
-      } else {
-        numberOfKeyCodersToUse = numberOfShardKeyCoders;
-        shardOffset = 0;
-      }
-
-      checkArgument(numberOfKeyCodersToUse <= keyComponents.size(),
-          "Expected at least %s key component(s) but received %s.",
-          numberOfShardKeyCoders, keyComponents);
-
-      try {
-        // Encode the shard portion
-        for (int i = 0; i < numberOfKeyCodersToUse; ++i) {
-          getKeyComponentCoder(i).encode(
-              keyComponents.get(i), keyBytesToMutate.asOutputStream(), Context.NESTED);
-          keyComponentByteOffsetsToMutate.add(keyBytesToMutate.size());
-        }
-        int rval = HASH_FUNCTION.hashBytes(
-            keyBytesToMutate.array(), 0, keyBytesToMutate.size()).asInt() & SHARD_BITS;
-        rval += shardOffset;
-
-        // Encode the remainder
-        for (int i = numberOfKeyCodersToUse; i < keyComponents.size(); ++i) {
-          getKeyComponentCoder(i).encode(
-              keyComponents.get(i), keyBytesToMutate.asOutputStream(), Context.NESTED);
-          keyComponentByteOffsetsToMutate.add(keyBytesToMutate.size());
-        }
-        return rval;
-      } catch (IOException e) {
-        throw new IllegalStateException(
-            String.format("Failed to hash %s with coder %s", keyComponents, this), e);
-      }
-    }
-
-    @Override
-    public List<Coder<?>> getCoderArguments() {
-      return ImmutableList.<Coder<?>>builder()
-          .addAll(keyComponentCoders)
-          .add(valueCoder)
-          .build();
-    }
-
-    @Override
-    public CloudObject asCloudObject() {
-      CloudObject cloudObject = super.asCloudObject();
-      addLong(cloudObject, PropertyNames.NUM_SHARD_CODERS, numberOfShardKeyCoders);
-      addLong(cloudObject, PropertyNames.NUM_METADATA_SHARD_CODERS, numberOfMetadataShardKeyCoders);
-      return cloudObject;
-    }
-
-    @Override
-    public void verifyDeterministic() throws Coder.NonDeterministicException {
-      verifyDeterministic("Key component coders expected to be deterministic.", keyComponentCoders);
-      verifyDeterministic("Value coder expected to be deterministic.", valueCoder);
-    }
-
-    @Override
-    public boolean consistentWithEquals() {
-      for (Coder<?> keyComponentCoder : keyComponentCoders) {
-        if (!keyComponentCoder.consistentWithEquals()) {
-          return false;
-        }
-      }
-      return valueCoder.consistentWithEquals();
-    }
-
-    @Override
-    public Object structuralValue(IsmRecord<V> record) throws Exception {
-      checkState(record.getKeyComponents().size() == keyComponentCoders.size(),
-          "Expected the number of key component coders %s "
-          + "to match the number of key components %s.",
-          keyComponentCoders.size(), record.getKeyComponents());
-
-      if (record != null && consistentWithEquals()) {
-        ArrayList<Object> keyComponentStructuralValues = new ArrayList<>();
-        for (int i = 0; i < keyComponentCoders.size(); ++i) {
-          keyComponentStructuralValues.add(
-              getKeyComponentCoder(i).structuralValue(record.getKeyComponent(i)));
-        }
-        if (isMetadataKey(record.getKeyComponents())) {
-          return IsmRecord.meta(keyComponentStructuralValues, record.getMetadata());
-        } else {
-          return IsmRecord.of(keyComponentStructuralValues,
-              valueCoder.structuralValue(record.getValue()));
-        }
-      }
-      return super.structuralValue(record);
-    }
-  }
-
-  /**
-   * Validates that the key portion of the given coder is deterministic.
-   */
-  static void validateCoderIsCompatible(IsmRecordCoder<?> coder) {
-    for (Coder<?> keyComponentCoder : coder.getKeyComponentCoders()) {
-      try {
-          keyComponentCoder.verifyDeterministic();
-      } catch (NonDeterministicException e) {
-        throw new IllegalArgumentException(
-            String.format("Key component coder %s is expected to be deterministic.",
-                keyComponentCoder), e);
-      }
-    }
-  }
-
-  /** Returns true if and only if any of the passed in key components represent a metadata key. */
-  public static boolean isMetadataKey(List<?> keyComponents) {
-    for (Object keyComponent : keyComponents) {
-      if (keyComponent == METADATA_KEY) {
-        return true;
-      }
-    }
-    return false;
-  }
-
-  /** A marker object representing the wildcard metadata key component. */
-  private static final Object METADATA_KEY = new Object() {
-    @Override
-    public String toString() {
-      return "META";
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-      return this == obj;
-    }
-
-    @Override
-    public int hashCode() {
-      return -1248902349;
-    }
-  };
-
-  /**
-   * An object representing a wild card for a key component.
-   * Encoded using {@link MetadataKeyCoder}.
-   */
-  public static Object getMetadataKey() {
-    return METADATA_KEY;
-  }
-
-  /**
-   * A coder for metadata key component. Can be used to wrap key component coder allowing for
-   * the metadata key component to be used as a place holder instead of an actual key.
-   */
-  public static class MetadataKeyCoder<K> extends StandardCoder<K> {
-    public static <K> MetadataKeyCoder<K> of(Coder<K> keyCoder) {
-      checkNotNull(keyCoder);
-      return new MetadataKeyCoder<>(keyCoder);
-    }
-
-    /**
-     * Returns an IsmRecordCoder with the specified coders. Note that this method is not meant
-     * to be called by users but used by Jackson when decoding this coder.
-     */
-    @JsonCreator
-    public static MetadataKeyCoder<?> of(
-        @JsonProperty(PropertyNames.COMPONENT_ENCODINGS) List<Coder<?>> components) {
-      Preconditions.checkArgument(components.size() == 1,
-          "Expecting one component, got " + components.size());
-      return of(components.get(0));
-    }
-
-    private final Coder<K> keyCoder;
-
-    private MetadataKeyCoder(Coder<K> keyCoder) {
-      this.keyCoder = keyCoder;
-    }
-
-    public Coder<K> getKeyCoder() {
-      return keyCoder;
-    }
-
-    @Override
-    public void encode(K value, OutputStream outStream, Coder.Context context)
-        throws CoderException, IOException {
-      if (value == METADATA_KEY) {
-        outStream.write(0);
-      } else {
-        outStream.write(1);
-        keyCoder.encode(value, outStream, context.nested());
-      }
-    }
-
-    @Override
-    public K decode(InputStream inStream, Coder.Context context)
-        throws CoderException, IOException {
-      int marker = inStream.read();
-      if (marker == 0) {
-        return (K) getMetadataKey();
-      } else if (marker == 1) {
-        return keyCoder.decode(inStream, context.nested());
-      } else {
-        throw new CoderException(String.format("Expected marker but got %s.", marker));
-      }
-    }
-
-    @Override
-    public List<Coder<?>> getCoderArguments() {
-      return ImmutableList.<Coder<?>>of(keyCoder);
-    }
-
-    @Override
-    public void verifyDeterministic() throws NonDeterministicException {
-      verifyDeterministic("Expected key coder to be deterministic", keyCoder);
-    }
-  }
-
-  /**
-   * A shard descriptor containing shard id, the data block offset, and the index offset for the
-   * given shard.
-   */
-  public static class IsmShard {
-    private final int id;
-    private final long blockOffset;
-    private final long indexOffset;
-
-    /** Returns an IsmShard with the given id, block offset and no index offset. */
-    public static IsmShard of(int id, long blockOffset) {
-      IsmShard ismShard = new IsmShard(id, blockOffset, -1);
-      checkState(id >= 0,
-          "%s attempting to be written with negative shard id.",
-          ismShard);
-      checkState(blockOffset >= 0,
-          "%s attempting to be written with negative block offset.",
-          ismShard);
-      return ismShard;
-    }
-
-    /** Returns an IsmShard with the given id, block offset, and index offset. */
-    public static IsmShard of(int id, long blockOffset, long indexOffset) {
-      IsmShard ismShard = new IsmShard(id, blockOffset, indexOffset);
-      checkState(id >= 0,
-          "%s attempting to be written with negative shard id.",
-          ismShard);
-      checkState(blockOffset >= 0,
-          "%s attempting to be written with negative block offset.",
-          ismShard);
-      checkState(indexOffset >= 0,
-          "%s attempting to be written with negative index offset.",
-          ismShard);
-      return ismShard;
-    }
-
-    private IsmShard(int id, long blockOffset, long indexOffset) {
-      this.id = id;
-      this.blockOffset = blockOffset;
-      this.indexOffset = indexOffset;
-    }
-
-    /** Return the shard id. */
-    public int getId() {
-      return id;
-    }
-
-    /** Return the absolute position within the Ism file where the data block begins. */
-    public long getBlockOffset() {
-      return blockOffset;
-    }
-
-    /**
-     * Return the absolute position within the Ism file where the index block begins.
-     * Throws {@link IllegalStateException} if the index offset was never specified.
-     */
-    public long getIndexOffset() {
-      checkState(indexOffset >= 0,
-            "Unable to fetch index offset because it was never specified.");
-      return indexOffset;
-    }
-
-    /** Returns a new IsmShard like this one with the specified index offset. */
-    public IsmShard withIndexOffset(long indexOffset) {
-      return of(id, blockOffset, indexOffset);
-    }
-
-    @Override
-    public String toString() {
-      return MoreObjects.toStringHelper(IsmShard.class)
-          .add("id", id)
-          .add("blockOffset", blockOffset)
-          .add("indexOffset", indexOffset)
-          .toString();
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-      if (!(obj instanceof IsmShard)) {
-        return false;
-      }
-      IsmShard other = (IsmShard) obj;
-      return Objects.equal(id, other.id)
-          && Objects.equal(blockOffset, other.blockOffset)
-          && Objects.equal(indexOffset, other.indexOffset);
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hashCode(id, blockOffset, indexOffset);
-    }
-  }
-
-  /**
-   * A {@link ListCoder} wrapping a {@link IsmShardCoder} used to encode the shard index.
-   * See {@link ListCoder} for its encoding specification and {@link IsmShardCoder} for its
-   * encoding specification.
-   */
-  public static final Coder<List<IsmShard>> ISM_SHARD_INDEX_CODER =
-      ListCoder.of(IsmShardCoder.of());
-
-  /**
-   * A coder for {@link IsmShard}s.
-   *
-   * The shard descriptor is encoded as:
-   * <ul>
-   *   <li>id (variable length integer encoding)</li>
-   *   <li>blockOffset (variable length long encoding)</li>
-   *   <li>indexOffset (variable length long encoding)</li>
-   * </ul>
-   */
-  public static class IsmShardCoder extends AtomicCoder<IsmShard> {
-    private static final IsmShardCoder INSTANCE = new IsmShardCoder();
-
-    /** Returns an IsmShardCoder. */
-    @JsonCreator
-    public static IsmShardCoder of() {
-      return INSTANCE;
-    }
-
-    private IsmShardCoder() {
-    }
-
-    @Override
-    public void encode(IsmShard value, OutputStream outStream, Coder.Context context)
-        throws CoderException, IOException {
-      checkState(value.getIndexOffset() >= 0,
-          "%s attempting to be written without index offset.",
-          value);
-      VarIntCoder.of().encode(value.getId(), outStream, context.nested());
-      VarLongCoder.of().encode(value.getBlockOffset(), outStream, context.nested());
-      VarLongCoder.of().encode(value.getIndexOffset(), outStream, context.nested());
-    }
-
-    @Override
-    public IsmShard decode(
-        InputStream inStream, Coder.Context context) throws CoderException, IOException {
-      return IsmShard.of(
-          VarIntCoder.of().decode(inStream, context),
-          VarLongCoder.of().decode(inStream, context),
-          VarLongCoder.of().decode(inStream, context));
-    }
-
-    @Override
-    public boolean consistentWithEquals() {
-      return true;
-    }
-  }
-
-  /**
-   * The prefix used before each key which contains the number of shared and unshared
-   * bytes from the previous key that was read. The key prefix along with the previous key
-   * and the unshared key bytes allows one to construct the current key by doing the following
-   * {@code currentKey = previousKey[0 : sharedBytes] + read(unsharedBytes)}.
-   *
-   * <p>The key prefix is encoded as:
-   * <ul>
-   *   <li>number of shared key bytes (variable length integer coding)</li>
-   *   <li>number of unshared key bytes (variable length integer coding)</li>
-   * </ul>
-   */
-  static class KeyPrefix {
-    private final int sharedKeySize;
-    private final int unsharedKeySize;
-
-    KeyPrefix(int sharedBytes, int unsharedBytes) {
-      this.sharedKeySize = sharedBytes;
-      this.unsharedKeySize = unsharedBytes;
-    }
-
-    public int getSharedKeySize() {
-      return sharedKeySize;
-    }
-
-    public int getUnsharedKeySize() {
-      return unsharedKeySize;
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hashCode(sharedKeySize, unsharedKeySize);
-    }
-
-    @Override
-    public boolean equals(Object other) {
-      if (other == this) {
-        return true;
-      }
-      if (!(other instanceof KeyPrefix)) {
-        return false;
-      }
-      KeyPrefix keyPrefix = (KeyPrefix) other;
-      return sharedKeySize == keyPrefix.sharedKeySize
-          && unsharedKeySize == keyPrefix.unsharedKeySize;
-    }
-
-    @Override
-    public String toString() {
-      return MoreObjects.toStringHelper(this)
-          .add("sharedKeySize", sharedKeySize)
-          .add("unsharedKeySize", unsharedKeySize)
-          .toString();
-    }
-  }
-
-  /** A {@link Coder} for {@link KeyPrefix}. */
-  static final class KeyPrefixCoder extends AtomicCoder<KeyPrefix> {
-    private static final KeyPrefixCoder INSTANCE = new KeyPrefixCoder();
-
-    @JsonCreator
-    public static KeyPrefixCoder of() {
-      return INSTANCE;
-    }
-
-    @Override
-    public void encode(KeyPrefix value, OutputStream outStream, Coder.Context context)
-        throws CoderException, IOException {
-      VarInt.encode(value.sharedKeySize, outStream);
-      VarInt.encode(value.unsharedKeySize, outStream);
-    }
-
-    @Override
-    public KeyPrefix decode(InputStream inStream, Coder.Context context)
-        throws CoderException, IOException {
-      return new KeyPrefix(VarInt.decodeInt(inStream), VarInt.decodeInt(inStream));
-    }
-
-    @Override
-    public boolean consistentWithEquals() {
-      return true;
-    }
-
-    @Override
-    public boolean isRegisterByteSizeObserverCheap(KeyPrefix value, Coder.Context context) {
-      return true;
-    }
-
-    @Override
-    protected long getEncodedElementByteSize(KeyPrefix value, Coder.Context context)
-        throws Exception {
-      Preconditions.checkNotNull(value);
-      return VarInt.getLength(value.sharedKeySize) + VarInt.getLength(value.unsharedKeySize);
-    }
-  }
-
-  /**
-   * The footer stores the relevant information required to locate the index and bloom filter.
-   * It also stores a version byte and the number of keys stored.
-   *
-   * <p>The footer is encoded as the value containing:
-   * <ul>
-   *   <li>start of bloom filter offset (big endian long coding)</li>
-   *   <li>start of shard index position offset (big endian long coding)</li>
-   *   <li>number of keys in file (big endian long coding)</li>
-   *   <li>0x01 (version key as a single byte)</li>
-   * </ul>
-   */
-  static class Footer {
-    static final int LONG_BYTES = 8;
-    static final int FIXED_LENGTH = 3 * LONG_BYTES + 1;
-    static final byte VERSION = 2;
-
-    private final long indexPosition;
-    private final long bloomFilterPosition;
-    private final long numberOfKeys;
-
-    Footer(long indexPosition, long bloomFilterPosition, long numberOfKeys) {
-      this.indexPosition = indexPosition;
-      this.bloomFilterPosition = bloomFilterPosition;
-      this.numberOfKeys = numberOfKeys;
-    }
-
-    public long getIndexPosition() {
-      return indexPosition;
-    }
-
-    public long getBloomFilterPosition() {
-      return bloomFilterPosition;
-    }
-
-    public long getNumberOfKeys() {
-      return numberOfKeys;
-    }
-
-    @Override
-    public boolean equals(Object other) {
-      if (other == this) {
-        return true;
-      }
-      if (!(other instanceof Footer)) {
-        return false;
-      }
-      Footer footer = (Footer) other;
-      return indexPosition == footer.indexPosition
-          && bloomFilterPosition == footer.bloomFilterPosition
-          && numberOfKeys == footer.numberOfKeys;
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hashCode(indexPosition, bloomFilterPosition, numberOfKeys);
-    }
-
-    @Override
-    public String toString() {
-      return MoreObjects.toStringHelper(this)
-          .add("version", Footer.VERSION)
-          .add("indexPosition", indexPosition)
-          .add("bloomFilterPosition", bloomFilterPosition)
-          .add("numberOfKeys", numberOfKeys)
-          .toString();
-    }
-  }
-
-  /** A {@link Coder} for {@link Footer}. */
-  static final class FooterCoder extends AtomicCoder<Footer> {
-    private static final FooterCoder INSTANCE = new FooterCoder();
-
-    @JsonCreator
-    public static FooterCoder of() {
-      return INSTANCE;
-    }
-
-    @Override
-    public void encode(Footer value, OutputStream outStream, Coder.Context context)
-        throws CoderException, IOException {
-      DataOutputStream dataOut = new DataOutputStream(outStream);
-      dataOut.writeLong(value.indexPosition);
-      dataOut.writeLong(value.bloomFilterPosition);
-      dataOut.writeLong(value.numberOfKeys);
-      dataOut.write(Footer.VERSION);
-    }
-
-    @Override
-    public Footer decode(InputStream inStream, Coder.Context context)
-        throws CoderException, IOException {
-      DataInputStream dataIn = new DataInputStream(inStream);
-      Footer footer = new Footer(dataIn.readLong(), dataIn.readLong(), dataIn.readLong());
-      int version = dataIn.read();
-      if (version != Footer.VERSION) {
-        throw new IOException("Unknown version " + version + ". "
-            + "Only version 2 is currently supported.");
-      }
-      return footer;
-    }
-
-    @Override
-    public boolean consistentWithEquals() {
-      return true;
-    }
-
-    @Override
-    public boolean isRegisterByteSizeObserverCheap(Footer value, Coder.Context context) {
-      return true;
-    }
-
-    @Override
-    protected long getEncodedElementByteSize(Footer value, Coder.Context context)
-        throws Exception {
-      return Footer.FIXED_LENGTH;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/package-info.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/package-info.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/package-info.java
deleted file mode 100644
index af0a345..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/package-info.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-/**
- * Implementation of the harness that runs on each Google Compute Engine instance to coordinate
- * execution of Pipeline code.
- */
-@ParametersAreNonnullByDefault
-package com.google.cloud.dataflow.sdk.runners.worker;
-
-import javax.annotation.ParametersAreNonnullByDefault;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/CoderProperties.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/CoderProperties.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/CoderProperties.java
deleted file mode 100644
index 5705dc4..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/CoderProperties.java
+++ /dev/null
@@ -1,349 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.testing;
-
-import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.Matchers.emptyIterable;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.hasItem;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.fail;
-
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.Coder.NonDeterministicException;
-import com.google.cloud.dataflow.sdk.coders.CoderException;
-import com.google.cloud.dataflow.sdk.util.CoderUtils;
-import com.google.cloud.dataflow.sdk.util.PropertyNames;
-import com.google.cloud.dataflow.sdk.util.SerializableUtils;
-import com.google.cloud.dataflow.sdk.util.Serializer;
-import com.google.cloud.dataflow.sdk.util.Structs;
-import com.google.cloud.dataflow.sdk.util.UnownedInputStream;
-import com.google.cloud.dataflow.sdk.util.UnownedOutputStream;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Iterables;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-
-/**
- * Properties for use in {@link Coder} tests. These are implemented with junit assertions
- * rather than as predicates for the sake of error messages.
- *
- * <p>We serialize and deserialize the coder to make sure that any state information required by
- * the coder is preserved. This causes tests written such that coders that lose information during
- * serialization or change state during encoding/decoding will fail.
- */
-public class CoderProperties {
-
-  /**
-   * All the contexts, for use in test cases.
-   */
-   public static final List<Coder.Context> ALL_CONTEXTS = Arrays.asList(
-       Coder.Context.OUTER, Coder.Context.NESTED);
-
-  /**
-   * Verifies that for the given {@code Coder<T>}, and values of
-   * type {@code T}, if the values are equal then the encoded bytes are equal, in any
-   * {@code Coder.Context}.
-   */
-  public static <T> void coderDeterministic(
-      Coder<T> coder, T value1, T value2)
-      throws Exception {
-    for (Coder.Context context : ALL_CONTEXTS) {
-      coderDeterministicInContext(coder, context, value1, value2);
-    }
-  }
-
-  /**
-   * Verifies that for the given {@code Coder<T>}, {@code Coder.Context}, and values of
-   * type {@code T}, if the values are equal then the encoded bytes are equal.
-   */
-  public static <T> void coderDeterministicInContext(
-      Coder<T> coder, Coder.Context context, T value1, T value2)
-      throws Exception {
-
-    try {
-      coder.verifyDeterministic();
-    } catch (NonDeterministicException e) {
-      fail("Expected that the coder is deterministic");
-    }
-    assertThat("Expected that the passed in values are equal()", value1, equalTo(value2));
-    assertThat(
-        encode(coder, context, value1),
-        equalTo(encode(coder, context, value2)));
-  }
-
-  /**
-   * Verifies that for the given {@code Coder<T>},
-   * and value of type {@code T}, encoding followed by decoding yields an
-   * equal value of type {@code T}, in any {@code Coder.Context}.
-   */
-  public static <T> void coderDecodeEncodeEqual(
-      Coder<T> coder, T value)
-      throws Exception {
-    for (Coder.Context context : ALL_CONTEXTS) {
-      coderDecodeEncodeEqualInContext(coder, context, value);
-    }
-  }
-
-  /**
-   * Verifies that for the given {@code Coder<T>}, {@code Coder.Context},
-   * and value of type {@code T}, encoding followed by decoding yields an
-   * equal value of type {@code T}.
-   */
-  public static <T> void coderDecodeEncodeEqualInContext(
-      Coder<T> coder, Coder.Context context, T value)
-      throws Exception {
-    assertThat(decodeEncode(coder, context, value), equalTo(value));
-  }
-
-  /**
-   * Verifies that for the given {@code Coder<Collection<T>>},
-   * and value of type {@code Collection<T>}, encoding followed by decoding yields an
-   * equal value of type {@code Collection<T>}, in any {@code Coder.Context}.
-   */
-  public static <T, CollectionT extends Collection<T>> void coderDecodeEncodeContentsEqual(
-      Coder<CollectionT> coder, CollectionT value)
-      throws Exception {
-    for (Coder.Context context : ALL_CONTEXTS) {
-      coderDecodeEncodeContentsEqualInContext(coder, context, value);
-    }
-  }
-
-  /**
-   * Verifies that for the given {@code Coder<Collection<T>>},
-   * and value of type {@code Collection<T>}, encoding followed by decoding yields an
-   * equal value of type {@code Collection<T>}, in the given {@code Coder.Context}.
-   */
-  @SuppressWarnings("unchecked")
-  public static <T, CollectionT extends Collection<T>> void coderDecodeEncodeContentsEqualInContext(
-      Coder<CollectionT> coder, Coder.Context context, CollectionT value)
-      throws Exception {
-    // Matchers.containsInAnyOrder() requires at least one element
-    Collection<T> result = decodeEncode(coder, context, value);
-    if (value.isEmpty()) {
-      assertThat(result, emptyIterable());
-    } else {
-      // This is the only Matchers.containInAnyOrder() overload that takes literal values
-      assertThat(result, containsInAnyOrder((T[]) value.toArray()));
-    }
-  }
-
-  /**
-   * Verifies that for the given {@code Coder<Collection<T>>},
-   * and value of type {@code Collection<T>}, encoding followed by decoding yields an
-   * equal value of type {@code Collection<T>}, in any {@code Coder.Context}.
-   */
-  public static <T, IterableT extends Iterable<T>> void coderDecodeEncodeContentsInSameOrder(
-      Coder<IterableT> coder, IterableT value)
-      throws Exception {
-    for (Coder.Context context : ALL_CONTEXTS) {
-      CoderProperties.<T, IterableT>coderDecodeEncodeContentsInSameOrderInContext(
-          coder, context, value);
-    }
-  }
-
-  /**
-   * Verifies that for the given {@code Coder<Iterable<T>>},
-   * and value of type {@code Iterable<T>}, encoding followed by decoding yields an
-   * equal value of type {@code Collection<T>}, in the given {@code Coder.Context}.
-   */
-  @SuppressWarnings("unchecked")
-  public static <T, IterableT extends Iterable<T>> void
-      coderDecodeEncodeContentsInSameOrderInContext(
-          Coder<IterableT> coder, Coder.Context context, IterableT value)
-      throws Exception {
-    Iterable<T> result = decodeEncode(coder, context, value);
-    // Matchers.contains() requires at least one element
-    if (Iterables.isEmpty(value)) {
-      assertThat(result, emptyIterable());
-    } else {
-      // This is the only Matchers.contains() overload that takes literal values
-      assertThat(result, contains((T[]) Iterables.toArray(value, Object.class)));
-    }
-  }
-
-  public static <T> void coderSerializable(Coder<T> coder) {
-    SerializableUtils.ensureSerializable(coder);
-  }
-
-  public static <T> void coderConsistentWithEquals(
-      Coder<T> coder, T value1, T value2)
-      throws Exception {
-
-    for (Coder.Context context : ALL_CONTEXTS) {
-      CoderProperties.<T>coderConsistentWithEqualsInContext(coder, context, value1, value2);
-    }
-  }
-
-  public static <T> void coderConsistentWithEqualsInContext(
-      Coder<T> coder, Coder.Context context, T value1, T value2) throws Exception {
-
-    assertEquals(
-        value1.equals(value2),
-        Arrays.equals(
-            encode(coder, context, value1),
-            encode(coder, context, value2)));
-  }
-
-  public static <T> void coderHasEncodingId(Coder<T> coder, String encodingId) throws Exception {
-    assertThat(coder.getEncodingId(), equalTo(encodingId));
-    assertThat(Structs.getString(coder.asCloudObject(), PropertyNames.ENCODING_ID, ""),
-        equalTo(encodingId));
-  }
-
-  public static <T> void coderAllowsEncoding(Coder<T> coder, String encodingId) throws Exception {
-    assertThat(coder.getAllowedEncodings(), hasItem(encodingId));
-    assertThat(
-        String.format("Expected to find \"%s\" in property \"%s\" of %s",
-            encodingId, PropertyNames.ALLOWED_ENCODINGS, coder.asCloudObject()),
-        Structs.getStrings(
-            coder.asCloudObject(),
-            PropertyNames.ALLOWED_ENCODINGS,
-            Collections.<String>emptyList()),
-        hasItem(encodingId));
-  }
-
-  public static <T> void structuralValueConsistentWithEquals(
-      Coder<T> coder, T value1, T value2)
-      throws Exception {
-
-    for (Coder.Context context : ALL_CONTEXTS) {
-      CoderProperties.<T>structuralValueConsistentWithEqualsInContext(
-          coder, context, value1, value2);
-    }
-  }
-
-  public static <T> void structuralValueConsistentWithEqualsInContext(
-      Coder<T> coder, Coder.Context context, T value1, T value2) throws Exception {
-
-    assertEquals(
-        coder.structuralValue(value1).equals(coder.structuralValue(value2)),
-        Arrays.equals(
-            encode(coder, context, value1),
-            encode(coder, context, value2)));
-  }
-
-
-  private static final String DECODING_WIRE_FORMAT_MESSAGE =
-      "Decoded value from known wire format does not match expected value."
-      + " This probably means that this Coder no longer correctly decodes"
-      + " a prior wire format. Changing the wire formats this Coder can read"
-      + " should be avoided, as it is likely to cause breakage."
-      + " If you truly intend to change the backwards compatibility for this Coder "
-      + " then you must remove any now-unsupported encodings from getAllowedEncodings().";
-
-  public static <T> void coderDecodesBase64(Coder<T> coder, String base64Encoding, T value)
-      throws Exception {
-    assertThat(DECODING_WIRE_FORMAT_MESSAGE, CoderUtils.decodeFromBase64(coder, base64Encoding),
-        equalTo(value));
-  }
-
-  public static <T> void coderDecodesBase64(
-      Coder<T> coder, List<String> base64Encodings, List<T> values) throws Exception {
-    assertThat("List of base64 encodings has different size than List of values",
-        base64Encodings.size(), equalTo(values.size()));
-
-    for (int i = 0; i < base64Encodings.size(); i++) {
-      coderDecodesBase64(coder, base64Encodings.get(i), values.get(i));
-    }
-  }
-
-  private static final String ENCODING_WIRE_FORMAT_MESSAGE =
-      "Encoded value does not match expected wire format."
-      + " Changing the wire format should be avoided, as it is likely to cause breakage."
-      + " If you truly intend to change the wire format for this Coder "
-      + " then you must update getEncodingId() to a new value and add any supported"
-      + " prior formats to getAllowedEncodings()."
-      + " See com.google.cloud.dataflow.sdk.coders.PrintBase64Encoding for how to generate"
-      + " new test data.";
-
-  public static <T> void coderEncodesBase64(Coder<T> coder, T value, String base64Encoding)
-      throws Exception {
-    assertThat(ENCODING_WIRE_FORMAT_MESSAGE, CoderUtils.encodeToBase64(coder, value),
-        equalTo(base64Encoding));
-  }
-
-  public static <T> void coderEncodesBase64(
-      Coder<T> coder, List<T> values, List<String> base64Encodings) throws Exception {
-    assertThat("List of base64 encodings has different size than List of values",
-        base64Encodings.size(), equalTo(values.size()));
-
-    for (int i = 0; i < base64Encodings.size(); i++) {
-      coderEncodesBase64(coder, values.get(i), base64Encodings.get(i));
-    }
-  }
-
-  @SuppressWarnings("unchecked")
-  public static <T, IterableT extends Iterable<T>> void coderDecodesBase64ContentsEqual(
-      Coder<IterableT> coder, String base64Encoding, IterableT expected) throws Exception {
-
-    IterableT result = CoderUtils.decodeFromBase64(coder, base64Encoding);
-    if (Iterables.isEmpty(expected)) {
-      assertThat(ENCODING_WIRE_FORMAT_MESSAGE, result, emptyIterable());
-    } else {
-      assertThat(ENCODING_WIRE_FORMAT_MESSAGE, result,
-          containsInAnyOrder((T[]) Iterables.toArray(expected, Object.class)));
-    }
-  }
-
-  public static <T, IterableT extends Iterable<T>> void coderDecodesBase64ContentsEqual(
-      Coder<IterableT> coder, List<String> base64Encodings, List<IterableT> expected)
-          throws Exception {
-    assertThat("List of base64 encodings has different size than List of values",
-        base64Encodings.size(), equalTo(expected.size()));
-
-    for (int i = 0; i < base64Encodings.size(); i++) {
-      coderDecodesBase64ContentsEqual(coder, base64Encodings.get(i), expected.get(i));
-    }
-  }
-
-  //////////////////////////////////////////////////////////////////////////
-
-  @VisibleForTesting
-  static <T> byte[] encode(
-      Coder<T> coder, Coder.Context context, T value) throws CoderException, IOException {
-    @SuppressWarnings("unchecked")
-    Coder<T> deserializedCoder = Serializer.deserialize(coder.asCloudObject(), Coder.class);
-
-    ByteArrayOutputStream os = new ByteArrayOutputStream();
-    deserializedCoder.encode(value, new UnownedOutputStream(os), context);
-    return os.toByteArray();
-  }
-
-  @VisibleForTesting
-  static <T> T decode(
-      Coder<T> coder, Coder.Context context, byte[] bytes) throws CoderException, IOException {
-    @SuppressWarnings("unchecked")
-    Coder<T> deserializedCoder = Serializer.deserialize(coder.asCloudObject(), Coder.class);
-
-    ByteArrayInputStream is = new ByteArrayInputStream(bytes);
-    return deserializedCoder.decode(new UnownedInputStream(is), context);
-  }
-
-  private static <T> T decodeEncode(Coder<T> coder, Coder.Context context, T value)
-      throws CoderException, IOException {
-    return decode(coder, context, encode(coder, context, value));
-  }
-}