You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pe...@apache.org on 2017/08/20 15:03:21 UTC

[21/53] [abbrv] beam git commit: jstorm-runner: move most classes to translation package and reduece their visibility to package private.

http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundMultiTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundMultiTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundMultiTranslator.java
deleted file mode 100644
index 6e3392c..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundMultiTranslator.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation.translator;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Maps;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import org.apache.beam.runners.jstorm.translation.TranslationContext;
-import org.apache.beam.runners.jstorm.translation.runtime.DoFnExecutor;
-import org.apache.beam.runners.jstorm.translation.runtime.MultiOutputDoFnExecutor;
-import org.apache.beam.runners.jstorm.translation.runtime.MultiStatefulDoFnExecutor;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.PValueBase;
-import org.apache.beam.sdk.values.TupleTag;
-
-/**
- * Translates a ParDo.BoundMulti to a Storm {@link DoFnExecutor}.
- */
-public class ParDoBoundMultiTranslator<InputT, OutputT>
-    extends TransformTranslator.Default<ParDo.MultiOutput<InputT, OutputT>> {
-
-  @Override
-  public void translateNode(
-      ParDo.MultiOutput<InputT, OutputT> transform, TranslationContext context) {
-    final TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
-    final TupleTag<InputT> inputTag = (TupleTag<InputT>) userGraphContext.getInputTag();
-    PCollection<InputT> input = (PCollection<InputT>) userGraphContext.getInput();
-
-    Map<TupleTag<?>, PValue> allOutputs = Maps.newHashMap(userGraphContext.getOutputs());
-    Map<TupleTag<?>, TupleTag<?>> localToExternalTupleTagMap = Maps.newHashMap();
-    for (Map.Entry<TupleTag<?>, PValue> entry : allOutputs.entrySet()) {
-      Iterator<TupleTag<?>> itr = ((PValueBase) entry.getValue()).expand().keySet().iterator();
-      localToExternalTupleTagMap.put(entry.getKey(), itr.next());
-    }
-
-    TupleTag<OutputT> mainOutputTag = (TupleTag<OutputT>) userGraphContext.getOutputTag();
-    List<TupleTag<?>> sideOutputTags = userGraphContext.getOutputTags();
-    sideOutputTags.remove(mainOutputTag);
-
-    Map<TupleTag<?>, PValue> allInputs = Maps.newHashMap(userGraphContext.getInputs());
-    for (PCollectionView pCollectionView : transform.getSideInputs()) {
-      allInputs.put(userGraphContext.findTupleTag(pCollectionView), pCollectionView);
-    }
-    String description = describeTransform(
-        transform,
-        allInputs,
-        allOutputs);
-
-    ImmutableMap.Builder<TupleTag, PCollectionView<?>> sideInputTagToView = ImmutableMap.builder();
-    for (PCollectionView pCollectionView : transform.getSideInputs()) {
-      sideInputTagToView.put(userGraphContext.findTupleTag(pCollectionView), pCollectionView);
-    }
-
-    DoFnExecutor executor;
-    DoFnSignature signature = DoFnSignatures.getSignature(transform.getFn().getClass());
-    if (signature.stateDeclarations().size() > 0
-        || signature.timerDeclarations().size() > 0) {
-      executor = new MultiStatefulDoFnExecutor<>(
-          userGraphContext.getStepName(),
-          description,
-          userGraphContext.getOptions(),
-          (DoFn<KV, OutputT>) transform.getFn(),
-          (Coder) WindowedValue.getFullCoder(
-              input.getCoder(), input.getWindowingStrategy().getWindowFn().windowCoder()),
-          input.getWindowingStrategy(),
-          (TupleTag<KV>) inputTag,
-          transform.getSideInputs(),
-          sideInputTagToView.build(),
-          mainOutputTag,
-          sideOutputTags,
-          localToExternalTupleTagMap);
-    } else {
-      executor = new MultiOutputDoFnExecutor<>(
-          userGraphContext.getStepName(),
-          description,
-          userGraphContext.getOptions(),
-          transform.getFn(),
-          WindowedValue.getFullCoder(
-              input.getCoder(), input.getWindowingStrategy().getWindowFn().windowCoder()),
-          input.getWindowingStrategy(),
-          inputTag,
-          transform.getSideInputs(),
-          sideInputTagToView.build(),
-          mainOutputTag,
-          sideOutputTags,
-          localToExternalTupleTagMap);
-    }
-
-    context.addTransformExecutor(executor, ImmutableList.<PValue>copyOf(transform.getSideInputs()));
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundTranslator.java
deleted file mode 100644
index ad8f85f..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundTranslator.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation.translator;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
-import java.util.List;
-import java.util.Map;
-import org.apache.beam.runners.jstorm.translation.TranslationContext;
-import org.apache.beam.runners.jstorm.translation.runtime.DoFnExecutor;
-import org.apache.beam.runners.jstorm.translation.runtime.StatefulDoFnExecutor;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TupleTag;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Translates a ParDo.Bound to a JStorm {@link DoFnExecutor}.
- */
-public class ParDoBoundTranslator<InputT, OutputT>
-    extends TransformTranslator.Default<ParDo.SingleOutput<InputT, OutputT>> {
-
-  private static final Logger LOG = LoggerFactory.getLogger(ParDoBoundTranslator.class);
-
-  @Override
-  public void translateNode(
-      ParDo.SingleOutput<InputT, OutputT> transform, TranslationContext context) {
-    final TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
-    final TupleTag<?> inputTag = userGraphContext.getInputTag();
-    PCollection<InputT> input = (PCollection<InputT>) userGraphContext.getInput();
-
-    TupleTag<OutputT> mainOutputTag = (TupleTag<OutputT>) userGraphContext.getOutputTag();
-    List<TupleTag<?>> sideOutputTags = Lists.newArrayList();
-
-    Map<TupleTag<?>, PValue> allInputs =
-        avro.shaded.com.google.common.collect.Maps.newHashMap(userGraphContext.getInputs());
-    for (PCollectionView pCollectionView : transform.getSideInputs()) {
-      allInputs.put(userGraphContext.findTupleTag(pCollectionView), pCollectionView);
-    }
-    String description = describeTransform(
-        transform,
-        allInputs,
-        userGraphContext.getOutputs());
-
-    ImmutableMap.Builder<TupleTag, PCollectionView<?>> sideInputTagToView = ImmutableMap.builder();
-    for (PCollectionView pCollectionView : transform.getSideInputs()) {
-      sideInputTagToView.put(userGraphContext.findTupleTag(pCollectionView), pCollectionView);
-    }
-
-    DoFnExecutor executor;
-    DoFnSignature signature = DoFnSignatures.getSignature(transform.getFn().getClass());
-    if (signature.stateDeclarations().size() > 0
-        || signature.timerDeclarations().size() > 0) {
-      executor = new StatefulDoFnExecutor<>(
-          userGraphContext.getStepName(),
-          description,
-          userGraphContext.getOptions(),
-          (DoFn<KV, OutputT>) transform.getFn(),
-          (Coder) WindowedValue.getFullCoder(
-              input.getCoder(), input.getWindowingStrategy().getWindowFn().windowCoder()),
-          input.getWindowingStrategy(),
-          (TupleTag<KV>) inputTag,
-          transform.getSideInputs(),
-          sideInputTagToView.build(),
-          mainOutputTag,
-          sideOutputTags);
-    } else {
-      executor = new DoFnExecutor<>(
-          userGraphContext.getStepName(),
-          description,
-          userGraphContext.getOptions(),
-          transform.getFn(),
-          WindowedValue.getFullCoder(
-              input.getCoder(), input.getWindowingStrategy().getWindowFn().windowCoder()),
-          input.getWindowingStrategy(),
-          (TupleTag<InputT>) inputTag,
-          transform.getSideInputs(),
-          sideInputTagToView.build(),
-          mainOutputTag,
-          sideOutputTags);
-    }
-
-    context.addTransformExecutor(executor, ImmutableList.<PValue>copyOf(transform.getSideInputs()));
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/Stream.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/Stream.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/Stream.java
deleted file mode 100644
index 71243b9..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/Stream.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation.translator;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.google.auto.value.AutoValue;
-import java.util.List;
-import javax.annotation.Nullable;
-
-/**
- * Class that defines the stream connection between upstream and downstream components.
- */
-@AutoValue
-public abstract class Stream {
-
-  public abstract Producer getProducer();
-
-  public abstract Consumer getConsumer();
-
-  public static Stream of(Producer producer, Consumer consumer) {
-    return new org.apache.beam.runners.jstorm.translation.translator.AutoValue_Stream(
-        producer, consumer);
-  }
-
-  /**
-   * JStorm producer.
-   */
-  @AutoValue
-  public abstract static class Producer {
-    public abstract String getComponentId();
-
-    public abstract String getStreamId();
-
-    public abstract String getStreamName();
-
-    public static Producer of(String componentId, String streamId, String streamName) {
-      return new org.apache.beam.runners.jstorm.translation.translator.AutoValue_Stream_Producer(
-          componentId, streamId, streamName);
-    }
-  }
-
-  /**
-   * JStorm consumer.
-   */
-  @AutoValue
-  public abstract static class Consumer {
-    public abstract String getComponentId();
-
-    public abstract Grouping getGrouping();
-
-    public static Consumer of(String componentId, Grouping grouping) {
-      return new org.apache.beam.runners.jstorm.translation.translator.AutoValue_Stream_Consumer(
-          componentId, grouping);
-    }
-  }
-
-  /**
-   * JStorm grouping, which define how to transfer message between two nodes.
-   */
-  @AutoValue
-  public abstract static class Grouping {
-    public abstract Type getType();
-
-    @Nullable
-    public abstract List<String> getFields();
-
-    public static Grouping of(Type type) {
-      checkArgument(!Type.FIELDS.equals(type), "Fields grouping should provide key fields.");
-      return new org.apache.beam.runners.jstorm.translation.translator.AutoValue_Stream_Grouping(
-          type, null /* fields */);
-    }
-
-    public static Grouping byFields(List<String> fields) {
-      checkNotNull(fields, "fields");
-      checkArgument(!fields.isEmpty(), "No key fields were provided for field grouping!");
-      return new org.apache.beam.runners.jstorm.translation.translator.AutoValue_Stream_Grouping(
-          Type.FIELDS, fields);
-    }
-
-    /**
-     * Types of stream groupings Storm allows.
-     */
-    public enum Type {
-      ALL, CUSTOM, DIRECT, SHUFFLE, LOCAL_OR_SHUFFLE, FIELDS, GLOBAL, NONE
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/TransformTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/TransformTranslator.java
deleted file mode 100644
index bfa94a0..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/TransformTranslator.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation.translator;
-
-import com.google.common.base.Function;
-import com.google.common.base.Joiner;
-import com.google.common.collect.FluentIterable;
-import java.util.Map;
-import org.apache.beam.runners.jstorm.translation.TranslationContext;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TupleTag;
-
-/**
- * Interface for classes capable of tranforming Beam PTransforms into Storm primitives.
- */
-public interface TransformTranslator<T extends PTransform<?, ?>> {
-
-  void translateNode(T transform, TranslationContext context);
-
-  /**
-   * Returns true if this translator can translate the given transform.
-   */
-  boolean canTranslate(T transform, TranslationContext context);
-
-    /**
-     * Default translator.
-     * @param <T1>
-     */
-  class Default<T1 extends PTransform<?, ?>> implements TransformTranslator<T1> {
-    @Override
-    public void translateNode(T1 transform, TranslationContext context) {
-
-    }
-
-    @Override
-    public boolean canTranslate(T1 transform, TranslationContext context) {
-      return true;
-    }
-
-    static String describeTransform(
-        PTransform<?, ?> transform,
-        Map<TupleTag<?>, PValue> inputs,
-        Map<TupleTag<?>, PValue> outputs) {
-      return String.format("%s --> %s --> %s",
-          Joiner.on('+').join(FluentIterable.from(inputs.entrySet())
-              .transform(new Function<Map.Entry<TupleTag<?>, PValue>, String>() {
-                @Override
-                public String apply(Map.Entry<TupleTag<?>, PValue> taggedPValue) {
-                  return taggedPValue.getKey().getId();
-                  // return taggedPValue.getValue().getName();
-                }
-              })),
-          transform.getName(),
-          Joiner.on('+').join(FluentIterable.from(outputs.entrySet())
-              .transform(new Function<Map.Entry<TupleTag<?>, PValue>, String>() {
-                @Override
-                public String apply(Map.Entry<TupleTag<?>, PValue> taggedPvalue) {
-                  return taggedPvalue.getKey().getId();
-                  //return taggedPValue.getValue().getName();
-                }
-              })));
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/UnboundedSourceTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/UnboundedSourceTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/UnboundedSourceTranslator.java
deleted file mode 100644
index 33ac024..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/UnboundedSourceTranslator.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation.translator;
-
-import org.apache.beam.runners.jstorm.translation.TranslationContext;
-import org.apache.beam.runners.jstorm.translation.runtime.UnboundedSourceSpout;
-import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TaggedPValue;
-import org.apache.beam.sdk.values.TupleTag;
-
-/**
- * Translates a Read.Unbounded into a Storm spout.
- *
- * @param <T>
- */
-public class UnboundedSourceTranslator<T> extends TransformTranslator.Default<Read.Unbounded<T>> {
-  public void translateNode(Read.Unbounded<T> transform, TranslationContext context) {
-    TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
-    String description =
-        describeTransform(transform, userGraphContext.getInputs(), userGraphContext.getOutputs());
-
-    TupleTag<?> tag = userGraphContext.getOutputTag();
-    PValue output = userGraphContext.getOutput();
-
-    UnboundedSourceSpout spout = new UnboundedSourceSpout(
-        description,
-        transform.getSource(), userGraphContext.getOptions(), tag);
-    context.getExecutionGraphContext().registerSpout(spout, TaggedPValue.of(tag, output));
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ViewTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ViewTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ViewTranslator.java
deleted file mode 100644
index f71ee9c..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ViewTranslator.java
+++ /dev/null
@@ -1,380 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation.translator;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import org.apache.beam.runners.jstorm.translation.TranslationContext;
-import org.apache.beam.runners.jstorm.translation.runtime.ViewExecutor;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderRegistry;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.ListCoder;
-import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.View;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PCollectionViews;
-
-/**
- * A {@link TransformTranslator} for executing {@link View Views} in JStorm runner.
- */
-public class ViewTranslator
-    extends TransformTranslator.Default<ViewTranslator.CreateJStormPCollectionView<?, ?>> {
-  @Override
-  public void translateNode(
-      CreateJStormPCollectionView<?, ?> transform, TranslationContext context) {
-    TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
-    String description = describeTransform(
-        transform, userGraphContext.getInputs(), userGraphContext.getOutputs());
-    ViewExecutor viewExecutor = new ViewExecutor(description, userGraphContext.getOutputTag());
-    context.addTransformExecutor(viewExecutor);
-  }
-
-  /**
-   * Specialized implementation for
-   * {@link org.apache.beam.sdk.transforms.View.AsMap View.AsMap}.
-   */
-  public static class ViewAsMap<K, V>
-      extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, V>>> {
-
-    @SuppressWarnings("unused") // used via reflection in JstormRunner#apply()
-    public ViewAsMap(View.AsMap<K, V> transform) {
-    }
-
-    @Override
-    public PCollectionView<Map<K, V>> expand(PCollection<KV<K, V>> input) {
-      PCollectionView<Map<K, V>> view =
-          PCollectionViews.mapView(
-              input,
-              input.getWindowingStrategy(),
-              input.getCoder());
-
-      @SuppressWarnings({"rawtypes", "unchecked"})
-      KvCoder<K, V> inputCoder = (KvCoder) input.getCoder();
-      try {
-        inputCoder.getKeyCoder().verifyDeterministic();
-      } catch (Coder.NonDeterministicException e) {
-        // TODO: log warning as other runners.
-      }
-
-      return input
-          .apply(Combine.globally(new Concatenate<KV<K, V>>()).withoutDefaults())
-          .apply(CreateJStormPCollectionView.<KV<K, V>, Map<K, V>>of(view));
-    }
-
-    @Override
-    protected String getKindString() {
-      return "StreamingViewAsMap";
-    }
-  }
-
-  /**
-   * Specialized expansion for {@link
-   * View.AsMultimap View.AsMultimap}.
-   */
-  public static class ViewAsMultimap<K, V>
-      extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, Iterable<V>>>> {
-
-    /**
-     * Builds an instance of this class from the overridden transform.
-     */
-    @SuppressWarnings("unused") // used via reflection in JStormRunner#apply()
-    public ViewAsMultimap(View.AsMultimap<K, V> transform) {
-    }
-
-    @Override
-    public PCollectionView<Map<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) {
-      PCollectionView<Map<K, Iterable<V>>> view =
-          PCollectionViews.multimapView(
-              input,
-              input.getWindowingStrategy(),
-              input.getCoder());
-
-      @SuppressWarnings({"rawtypes", "unchecked"})
-      KvCoder<K, V> inputCoder = (KvCoder) input.getCoder();
-      try {
-        inputCoder.getKeyCoder().verifyDeterministic();
-      } catch (Coder.NonDeterministicException e) {
-        // TODO: log warning as other runners.
-      }
-
-      return input
-          .apply(Combine.globally(new Concatenate<KV<K, V>>()).withoutDefaults())
-          .apply(CreateJStormPCollectionView.<KV<K, V>, Map<K, Iterable<V>>>of(view));
-    }
-
-    @Override
-    protected String getKindString() {
-      return "StreamingViewAsMultimap";
-    }
-  }
-
-  /**
-   * Specialized implementation for
-   * {@link View.AsList View.AsList}.
-   */
-  public static class ViewAsList<T>
-      extends PTransform<PCollection<T>, PCollectionView<List<T>>> {
-    /**
-     * Builds an instance of this class from the overridden transform.
-     */
-    @SuppressWarnings("unused") // used via reflection in JStormRunner#apply()
-    public ViewAsList(View.AsList<T> transform) {
-    }
-
-    @Override
-    public PCollectionView<List<T>> expand(PCollection<T> input) {
-      PCollectionView<List<T>> view =
-          PCollectionViews.listView(
-              input,
-              input.getWindowingStrategy(),
-              input.getCoder());
-
-      return input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults())
-          .apply(CreateJStormPCollectionView.<T, List<T>>of(view));
-    }
-
-    @Override
-    protected String getKindString() {
-      return "StreamingViewAsList";
-    }
-  }
-
-  /**
-   * Specialized implementation for
-   * {@link View.AsIterable View.AsIterable} for the
-   * JStorm runner in streaming mode.
-   */
-  public static class ViewAsIterable<T>
-      extends PTransform<PCollection<T>, PCollectionView<Iterable<T>>> {
-    /**
-     * Builds an instance of this class from the overridden transform.
-     */
-    @SuppressWarnings("unused") // used via reflection in JStormRunner#apply()
-    public ViewAsIterable(View.AsIterable<T> transform) {
-    }
-
-    @Override
-    public PCollectionView<Iterable<T>> expand(PCollection<T> input) {
-      PCollectionView<Iterable<T>> view =
-          PCollectionViews.iterableView(
-              input,
-              input.getWindowingStrategy(),
-              input.getCoder());
-
-      return input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults())
-          .apply(CreateJStormPCollectionView.<T, Iterable<T>>of(view));
-    }
-
-    @Override
-    protected String getKindString() {
-      return "StreamingViewAsIterable";
-    }
-  }
-
-  /**
-   * Specialized expansion for
-   * {@link View.AsSingleton View.AsSingleton} for the
-   * JStorm runner in streaming mode.
-   */
-  public static class ViewAsSingleton<T>
-      extends PTransform<PCollection<T>, PCollectionView<T>> {
-    private View.AsSingleton<T> transform;
-
-    /**
-     * Builds an instance of this class from the overridden transform.
-     */
-    @SuppressWarnings("unused") // used via reflection in JStormRunner#apply()
-    public ViewAsSingleton(View.AsSingleton<T> transform) {
-      this.transform = transform;
-    }
-
-    @Override
-    public PCollectionView<T> expand(PCollection<T> input) {
-      Combine.Globally<T, T> combine = Combine.globally(
-          new SingletonCombine<>(transform.hasDefaultValue(), transform.defaultValue()));
-      if (!transform.hasDefaultValue()) {
-        combine = combine.withoutDefaults();
-      }
-      return input.apply(combine.asSingletonView());
-    }
-
-    @Override
-    protected String getKindString() {
-      return "StreamingViewAsSingleton";
-    }
-
-    private static class SingletonCombine<T> extends Combine.BinaryCombineFn<T> {
-      private boolean hasDefaultValue;
-      private T defaultValue;
-
-      SingletonCombine(boolean hasDefaultValue, T defaultValue) {
-        this.hasDefaultValue = hasDefaultValue;
-        this.defaultValue = defaultValue;
-      }
-
-      @Override
-      public T apply(T left, T right) {
-        throw new IllegalArgumentException("PCollection with more than one element "
-            + "accessed as a singleton view. Consider using Combine.globally().asSingleton() to "
-            + "combine the PCollection into a single value");
-      }
-
-      @Override
-      public T identity() {
-        if (hasDefaultValue) {
-          return defaultValue;
-        } else {
-          throw new IllegalArgumentException(
-              "Empty PCollection accessed as a singleton view. "
-                  + "Consider setting withDefault to provide a default value");
-        }
-      }
-    }
-  }
-
-  /**
-   * Specialized expansion for
-   * {@link org.apache.beam.sdk.transforms.Combine.GloballyAsSingletonView}.
-   * @param <InputT>
-   * @param <OutputT>
-     */
-  public static class CombineGloballyAsSingletonView<InputT, OutputT>
-      extends PTransform<PCollection<InputT>, PCollectionView<OutputT>> {
-    Combine.GloballyAsSingletonView<InputT, OutputT> transform;
-
-    /**
-     * Builds an instance of this class from the overridden transform.
-     */
-    @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
-    public CombineGloballyAsSingletonView(
-        Combine.GloballyAsSingletonView<InputT, OutputT> transform) {
-      this.transform = transform;
-    }
-
-    @Override
-    public PCollectionView<OutputT> expand(PCollection<InputT> input) {
-      PCollection<OutputT> combined =
-          input.apply(Combine.globally(transform.getCombineFn())
-              .withoutDefaults()
-              .withFanout(transform.getFanout()));
-
-      PCollectionView<OutputT> view = PCollectionViews.singletonView(
-          combined,
-          combined.getWindowingStrategy(),
-          transform.getInsertDefault(),
-          transform.getInsertDefault()
-              ? transform.getCombineFn().defaultValue() : null,
-          combined.getCoder());
-      return combined
-          .apply(ParDo.of(new WrapAsList<OutputT>()))
-          .apply(CreateJStormPCollectionView.<OutputT, OutputT>of(view));
-    }
-
-    @Override
-    protected String getKindString() {
-      return "StreamingCombineGloballyAsSingletonView";
-    }
-  }
-
-  private static class WrapAsList<T> extends DoFn<T, List<T>> {
-    @ProcessElement
-    public void processElement(ProcessContext c) {
-      c.output(Collections.singletonList(c.element()));
-    }
-  }
-
-  /**
-   * Combiner that combines {@code T}s into a single {@code List<T>} containing all inputs.
-   * They require the input {@link PCollection} fits in memory.
-   * For a large {@link PCollection} this is expected to crash!
-   *
-   * @param <T> the type of elements to concatenate.
-   */
-  private static class Concatenate<T> extends Combine.CombineFn<T, List<T>, List<T>> {
-    private static final long serialVersionUID = 1L;
-
-    @Override
-    public List<T> createAccumulator() {
-      return new ArrayList<>();
-    }
-
-    @Override
-    public List<T> addInput(List<T> accumulator, T input) {
-      accumulator.add(input);
-      return accumulator;
-    }
-
-    @Override
-    public List<T> mergeAccumulators(Iterable<List<T>> accumulators) {
-      List<T> result = createAccumulator();
-      for (List<T> accumulator : accumulators) {
-        result.addAll(accumulator);
-      }
-      return result;
-    }
-
-    @Override
-    public List<T> extractOutput(List<T> accumulator) {
-      return accumulator;
-    }
-
-    @Override
-    public Coder<List<T>> getAccumulatorCoder(CoderRegistry registry, Coder<T> inputCoder) {
-      return ListCoder.of(inputCoder);
-    }
-
-    @Override
-    public Coder<List<T>> getDefaultOutputCoder(CoderRegistry registry, Coder<T> inputCoder) {
-      return ListCoder.of(inputCoder);
-    }
-  }
-
-  /**
-   * Creates a primitive {@link PCollectionView}.
-   * For internal use only by runner implementors.
-   *
-   * @param <ElemT> The type of the elements of the input PCollection
-   * @param <ViewT> The type associated with the {@link PCollectionView} used as a side input
-   */
-  public static class CreateJStormPCollectionView<ElemT, ViewT>
-      extends PTransform<PCollection<List<ElemT>>, PCollectionView<ViewT>> {
-    private PCollectionView<ViewT> view;
-
-    private CreateJStormPCollectionView(PCollectionView<ViewT> view) {
-      this.view = view;
-    }
-
-    public static <ElemT, ViewT> CreateJStormPCollectionView<ElemT, ViewT> of(
-        PCollectionView<ViewT> view) {
-      return new CreateJStormPCollectionView<>(view);
-    }
-
-    @Override
-    public PCollectionView<ViewT> expand(PCollection<List<ElemT>> input) {
-      return view;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/WindowAssignTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/WindowAssignTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/WindowAssignTranslator.java
deleted file mode 100644
index 2ccb8d7..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/WindowAssignTranslator.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation.translator;
-
-import org.apache.beam.runners.jstorm.translation.TranslationContext;
-import org.apache.beam.runners.jstorm.translation.runtime.WindowAssignExecutor;
-import org.apache.beam.sdk.transforms.windowing.Window;
-
-/**
- * Translates a {@link org.apache.beam.sdk.transforms.windowing.Window.Assign} to a
- * JStorm {@link WindowAssignExecutor}.
- * @param <T>
- */
-public class WindowAssignTranslator<T> extends TransformTranslator.Default<Window.Assign<T>> {
-
-  @Override
-  public void translateNode(Window.Assign<T> transform, TranslationContext context) {
-    TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
-    String description =
-        describeTransform(transform, userGraphContext.getInputs(), userGraphContext.getOutputs());
-    context.getUserGraphContext().setWindowed();
-    WindowAssignExecutor executor = new WindowAssignExecutor(
-        description,
-        transform.getWindowFn(),
-        userGraphContext.getOutputTag());
-    context.addTransformExecutor(executor);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/CommonInstance.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/CommonInstance.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/CommonInstance.java
deleted file mode 100644
index 4b92a4c..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/CommonInstance.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation.util;
-
-/**
- * Common definition of JStorm runner.
- */
-public class CommonInstance {
-  public static final String KEY = "Key";
-  public static final String VALUE = "Value";
-
-  public static final String BEAM_WATERMARK_STREAM_ID = "BEAM_WATERMARK";
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/DefaultStepContext.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/DefaultStepContext.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/DefaultStepContext.java
deleted file mode 100644
index 4eb1d8f..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/DefaultStepContext.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation.util;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import java.io.IOException;
-import org.apache.beam.runners.core.ExecutionContext;
-import org.apache.beam.runners.core.StateInternals;
-import org.apache.beam.runners.core.TimerInternals;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.TupleTag;
-
-/**
- * Default StepContext for running DoFn This does not allow accessing state or timer internals.
- */
-public class DefaultStepContext implements ExecutionContext.StepContext {
-
-  private TimerInternals timerInternals;
-
-  private StateInternals stateInternals;
-
-  public DefaultStepContext(TimerInternals timerInternals, StateInternals stateInternals) {
-    this.timerInternals = checkNotNull(timerInternals, "timerInternals");
-    this.stateInternals = checkNotNull(stateInternals, "stateInternals");
-  }
-
-  @Override
-  public String getStepName() {
-    return null;
-  }
-
-  @Override
-  public String getTransformName() {
-    return null;
-  }
-
-  @Override
-  public void noteOutput(WindowedValue<?> windowedValue) {
-
-  }
-
-  @Override
-  public void noteOutput(TupleTag<?> tupleTag, WindowedValue<?> windowedValue) {
-
-  }
-
-  @Override
-  public <T, W extends BoundedWindow> void writePCollectionViewData(
-      TupleTag<?> tag, Iterable<WindowedValue<T>> data,
-      Coder<Iterable<WindowedValue<T>>> dataCoder, W window, Coder<W> windowCoder)
-      throws IOException {
-    throw new UnsupportedOperationException("Writing side-input data is not supported.");
-  }
-
-  @Override
-  public StateInternals stateInternals() {
-    return stateInternals;
-  }
-
-  @Override
-  public TimerInternals timerInternals() {
-    return timerInternals;
-  }
-
-  public void setStateInternals(StateInternals stateInternals) {
-    this.stateInternals = stateInternals;
-  }
-
-  public void setTimerInternals(TimerInternals timerInternals) {
-    this.timerInternals = timerInternals;
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/RunnerUtils.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/RunnerUtils.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/RunnerUtils.java
deleted file mode 100644
index ad83c2b..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/RunnerUtils.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.util;
-
-import org.apache.beam.runners.core.KeyedWorkItem;
-import org.apache.beam.runners.jstorm.translation.runtime.Executor;
-import org.apache.beam.runners.jstorm.translation.runtime.GroupByWindowExecutor;
-import org.apache.beam.runners.jstorm.translation.runtime.MultiStatefulDoFnExecutor;
-import org.apache.beam.runners.jstorm.translation.runtime.StatefulDoFnExecutor;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.KV;
-
-/**
- * Utils for JStorm runner.
- */
-public class RunnerUtils {
-  /**
-   * Convert {@link WindowedValue} into {@link KeyedWorkItem}.
-   * @param elem
-   * @return
-   */
-  public static <K, V> KeyedWorkItem<K, V> toKeyedWorkItem(WindowedValue<KV<K, V>> elem) {
-    WindowedValue<KV<K, V>> kvElem = (WindowedValue<KV<K, V>>) elem;
-    SingletonKeyedWorkItem<K, V> workItem = SingletonKeyedWorkItem.of(
-        kvElem.getValue().getKey(),
-        kvElem.withValue(kvElem.getValue().getValue()));
-    return workItem;
-  }
-
-  public static boolean isGroupByKeyExecutor(Executor executor) {
-    if (executor instanceof GroupByWindowExecutor) {
-      return true;
-    } else if (executor instanceof StatefulDoFnExecutor
-            || executor instanceof MultiStatefulDoFnExecutor) {
-      return true;
-    } else {
-      return false;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SerializedPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SerializedPipelineOptions.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SerializedPipelineOptions.java
deleted file mode 100644
index 479afdc..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SerializedPipelineOptions.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.jstorm.util;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.Serializable;
-import org.apache.beam.sdk.options.PipelineOptions;
-
-/**
- * Encapsulates the PipelineOptions in serialized form to ship them to the cluster.
- */
-public class SerializedPipelineOptions implements Serializable {
-
-  private final byte[] serializedOptions;
-
-  /**
-   * Lazily initialized copy of deserialized options.
-   */
-  private transient PipelineOptions pipelineOptions;
-
-  public SerializedPipelineOptions(PipelineOptions options) {
-    checkNotNull(options, "PipelineOptions must not be null.");
-
-    try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
-      new ObjectMapper().writeValue(baos, options);
-      this.serializedOptions = baos.toByteArray();
-    } catch (Exception e) {
-      throw new RuntimeException("Couldn't serialize PipelineOptions.", e);
-    }
-
-  }
-
-  public PipelineOptions getPipelineOptions() {
-    if (pipelineOptions == null) {
-      try {
-        pipelineOptions = new ObjectMapper().readValue(serializedOptions, PipelineOptions.class);
-      } catch (IOException e) {
-        throw new RuntimeException("Couldn't deserialize the PipelineOptions.", e);
-      }
-    }
-
-    return pipelineOptions;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SingletonKeyedWorkItem.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SingletonKeyedWorkItem.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SingletonKeyedWorkItem.java
deleted file mode 100644
index 46a12b9..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SingletonKeyedWorkItem.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.util;
-
-import java.util.Collections;
-import org.apache.beam.runners.core.KeyedWorkItem;
-import org.apache.beam.runners.core.TimerInternals;
-import org.apache.beam.sdk.util.WindowedValue;
-
-/**
- * Singleton keyed word item.
- * @param <K>
- * @param <ElemT>
- */
-public class SingletonKeyedWorkItem<K, ElemT> implements KeyedWorkItem<K, ElemT> {
-
-  final K key;
-  final WindowedValue<ElemT> value;
-
-  private SingletonKeyedWorkItem(K key, WindowedValue<ElemT> value) {
-    this.key = key;
-    this.value = value;
-  }
-
-  public static <K, ElemT> SingletonKeyedWorkItem<K, ElemT> of(K key, WindowedValue<ElemT> value) {
-    return new SingletonKeyedWorkItem<K, ElemT>(key, value);
-  }
-
-  @Override
-  public K key() {
-    return key;
-  }
-
-  public WindowedValue<ElemT> value() {
-    return value;
-  }
-
-  @Override
-  public Iterable<TimerInternals.TimerData> timersIterable() {
-    return Collections.EMPTY_LIST;
-  }
-
-  @Override
-  public Iterable<WindowedValue<ElemT>> elementsIterable() {
-    return Collections.singletonList(value);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/JStormStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/JStormStateInternalsTest.java b/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/JStormStateInternalsTest.java
new file mode 100644
index 0000000..b2ca267
--- /dev/null
+++ b/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/JStormStateInternalsTest.java
@@ -0,0 +1,221 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+import com.alibaba.jstorm.cache.IKvStoreManager;
+import com.alibaba.jstorm.cache.rocksdb.RocksDbKvStoreManagerFactory;
+import com.alibaba.jstorm.utils.KryoSerializer;
+import com.google.common.collect.Maps;
+import java.util.Iterator;
+import java.util.Map;
+import org.apache.beam.runners.core.StateNamespaces;
+import org.apache.beam.runners.core.StateTags;
+import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.state.BagState;
+import org.apache.beam.sdk.state.CombiningState;
+import org.apache.beam.sdk.state.MapState;
+import org.apache.beam.sdk.state.ValueState;
+import org.apache.beam.sdk.state.WatermarkHoldState;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.Max;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link JStormStateInternals}.
+ */
+@RunWith(JUnit4.class)
+public class JStormStateInternalsTest {
+
+  @Rule
+  public final TemporaryFolder tmp = new TemporaryFolder();
+
+  private JStormStateInternals<String> jstormStateInternals;
+
+  @Before
+  public void setup() throws Exception {
+    IKvStoreManager kvStoreManager = RocksDbKvStoreManagerFactory.getManager(
+        Maps.newHashMap(),
+        "test",
+        tmp.toString(),
+        new KryoSerializer(Maps.newHashMap()));
+    jstormStateInternals = new JStormStateInternals(
+        "key-1", kvStoreManager, new TimerServiceImpl(), 0);
+  }
+
+  @Test
+  public void testValueState() throws Exception {
+    ValueState<Integer> valueState = jstormStateInternals.state(
+        StateNamespaces.global(), StateTags.value("state-id-a", BigEndianIntegerCoder.of()));
+    valueState.write(Integer.MIN_VALUE);
+    assertEquals(Integer.MIN_VALUE, valueState.read().longValue());
+    valueState.write(Integer.MAX_VALUE);
+    assertEquals(Integer.MAX_VALUE, valueState.read().longValue());
+  }
+
+  @Test
+  public void testValueStateIdenticalId() throws Exception {
+    ValueState<Integer> valueState = jstormStateInternals.state(
+        StateNamespaces.global(), StateTags.value("state-id-a", BigEndianIntegerCoder.of()));
+    ValueState<Integer> valueStateIdentical = jstormStateInternals.state(
+        StateNamespaces.global(), StateTags.value("state-id-a", BigEndianIntegerCoder.of()));
+
+    valueState.write(Integer.MIN_VALUE);
+    assertEquals(Integer.MIN_VALUE, valueState.read().longValue());
+    assertEquals(Integer.MIN_VALUE, valueStateIdentical.read().longValue());
+    valueState.write(Integer.MAX_VALUE);
+    assertEquals(Integer.MAX_VALUE, valueState.read().longValue());
+    assertEquals(Integer.MAX_VALUE, valueStateIdentical.read().longValue());
+  }
+
+  @Test
+  public void testBagState() throws Exception {
+    BagState<Integer> bagStateA = jstormStateInternals.state(
+        StateNamespaces.global(), StateTags.bag("state-id-a", BigEndianIntegerCoder.of()));
+    BagState<Integer> bagStateB = jstormStateInternals.state(
+        StateNamespaces.global(), StateTags.bag("state-id-b", BigEndianIntegerCoder.of()));
+
+    bagStateA.add(1);
+    bagStateA.add(0);
+    bagStateA.add(Integer.MAX_VALUE);
+
+    bagStateB.add(0);
+    bagStateB.add(Integer.MIN_VALUE);
+
+    Iterable<Integer> bagA = bagStateA.read();
+    Iterable<Integer> bagB = bagStateB.read();
+    assertThat(bagA, containsInAnyOrder(1, 0, Integer.MAX_VALUE));
+    assertThat(bagB, containsInAnyOrder(0, Integer.MIN_VALUE));
+
+    bagStateA.clear();
+    bagStateA.add(1);
+    bagStateB.add(0);
+    assertThat(bagStateA.read(), containsInAnyOrder(1));
+    assertThat(bagStateB.read(), containsInAnyOrder(0, 0, Integer.MIN_VALUE));
+  }
+
+  @Test
+  public void testCombiningState() throws Exception {
+    Combine.CombineFn<Integer, int[], Integer> combineFn = Max.ofIntegers();
+    Coder<int[]> accumCoder = combineFn.getAccumulatorCoder(
+        CoderRegistry.createDefault(), BigEndianIntegerCoder.of());
+
+    CombiningState<Integer, int[], Integer> combiningState = jstormStateInternals.state(
+        StateNamespaces.global(),
+        StateTags.combiningValue(
+            "state-id-a",
+            accumCoder,
+            combineFn));
+    assertEquals(Integer.MIN_VALUE, combiningState.read().longValue());
+    combiningState.add(10);
+    assertEquals(10, combiningState.read().longValue());
+    combiningState.add(1);
+    assertEquals(10, combiningState.read().longValue());
+    combiningState.add(Integer.MAX_VALUE);
+    assertEquals(Integer.MAX_VALUE, combiningState.read().longValue());
+  }
+
+  @Test
+  public void testWatermarkHoldState() throws Exception {
+    WatermarkHoldState watermarkHoldState = jstormStateInternals.state(
+        StateNamespaces.global(),
+        StateTags.watermarkStateInternal(
+            "state-id-a",
+            TimestampCombiner.EARLIEST));
+    watermarkHoldState.add(new Instant(1));
+    assertEquals(1, watermarkHoldState.read().getMillis());
+    watermarkHoldState.add(new Instant(Integer.MIN_VALUE));
+    assertEquals(Integer.MIN_VALUE, watermarkHoldState.read().getMillis());
+    watermarkHoldState.add(new Instant(Integer.MAX_VALUE));
+    assertEquals(Integer.MIN_VALUE, watermarkHoldState.read().getMillis());
+  }
+
+  @Test
+  public void testMapState() throws Exception {
+    MapState<Integer, Integer> mapStateA = jstormStateInternals.state(
+        StateNamespaces.global(),
+        StateTags.map("state-id-a", BigEndianIntegerCoder.of(), BigEndianIntegerCoder.of()));
+    mapStateA.put(1, 1);
+    mapStateA.put(2, 22);
+    mapStateA.put(1, 12);
+
+    Iterable<Integer> keys = mapStateA.keys().read();
+    Iterable<Integer> values = mapStateA.values().read();
+    assertThat(keys, containsInAnyOrder(1, 2));
+    assertThat(values, containsInAnyOrder(12, 22));
+
+    Iterable<Map.Entry<Integer, Integer>> entries = mapStateA.entries().read();
+    Iterator<Map.Entry<Integer, Integer>> itr = entries.iterator();
+    Map.Entry<Integer, Integer> entry = itr.next();
+    assertEquals((long) entry.getKey(), 1L);
+    assertEquals((long) entry.getValue(), 12L);
+    entry = itr.next();
+    assertEquals((long) entry.getKey(), 2L);
+    assertEquals((long) entry.getValue(), 22L);
+    assertEquals(false, itr.hasNext());
+
+    mapStateA.remove(1);
+    keys = mapStateA.keys().read();
+    values = mapStateA.values().read();
+    assertThat(keys, containsInAnyOrder(2));
+    assertThat(values, containsInAnyOrder(22));
+
+    entries = mapStateA.entries().read();
+    itr = entries.iterator();
+    entry = itr.next();
+    assertEquals((long) entry.getKey(), 2L);
+    assertEquals((long) entry.getValue(), 22L);
+    assertEquals(false, itr.hasNext());
+  }
+
+  @Test
+  public void testMassiveDataOfBagState() {
+    BagState<Integer> bagStateA = jstormStateInternals.state(
+        StateNamespaces.global(), StateTags.bag("state-id-a", BigEndianIntegerCoder.of()));
+
+    int count = 10000;
+    int n = 1;
+    while (n <= count) {
+      bagStateA.add(n);
+      n++;
+    }
+
+    int readCount = 0;
+    int readN = 0;
+    Iterator<Integer> itr = bagStateA.read().iterator();
+    while (itr.hasNext()) {
+      readN += itr.next();
+      readCount++;
+    }
+
+    assertEquals((long) readN, ((1 + count) * count) / 2);
+    assertEquals((long) readCount, count);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormStateInternalsTest.java b/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormStateInternalsTest.java
deleted file mode 100644
index 66f33a7..0000000
--- a/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormStateInternalsTest.java
+++ /dev/null
@@ -1,222 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation.runtime.state;
-
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
-
-import com.alibaba.jstorm.cache.IKvStoreManager;
-import com.alibaba.jstorm.cache.rocksdb.RocksDbKvStoreManagerFactory;
-import com.alibaba.jstorm.utils.KryoSerializer;
-import com.google.common.collect.Maps;
-import java.util.Iterator;
-import java.util.Map;
-import org.apache.beam.runners.core.StateNamespaces;
-import org.apache.beam.runners.core.StateTags;
-import org.apache.beam.runners.jstorm.translation.runtime.TimerServiceImpl;
-import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderRegistry;
-import org.apache.beam.sdk.state.BagState;
-import org.apache.beam.sdk.state.CombiningState;
-import org.apache.beam.sdk.state.MapState;
-import org.apache.beam.sdk.state.ValueState;
-import org.apache.beam.sdk.state.WatermarkHoldState;
-import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.Max;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
-import org.joda.time.Instant;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link JStormStateInternals}.
- */
-@RunWith(JUnit4.class)
-public class JStormStateInternalsTest {
-
-  @Rule
-  public final TemporaryFolder tmp = new TemporaryFolder();
-
-  private JStormStateInternals<String> jstormStateInternals;
-
-  @Before
-  public void setup() throws Exception {
-    IKvStoreManager kvStoreManager = RocksDbKvStoreManagerFactory.getManager(
-        Maps.newHashMap(),
-        "test",
-        tmp.toString(),
-        new KryoSerializer(Maps.newHashMap()));
-    jstormStateInternals = new JStormStateInternals(
-        "key-1", kvStoreManager, new TimerServiceImpl(), 0);
-  }
-
-  @Test
-  public void testValueState() throws Exception {
-    ValueState<Integer> valueState = jstormStateInternals.state(
-        StateNamespaces.global(), StateTags.value("state-id-a", BigEndianIntegerCoder.of()));
-    valueState.write(Integer.MIN_VALUE);
-    assertEquals(Integer.MIN_VALUE, valueState.read().longValue());
-    valueState.write(Integer.MAX_VALUE);
-    assertEquals(Integer.MAX_VALUE, valueState.read().longValue());
-  }
-
-  @Test
-  public void testValueStateIdenticalId() throws Exception {
-    ValueState<Integer> valueState = jstormStateInternals.state(
-        StateNamespaces.global(), StateTags.value("state-id-a", BigEndianIntegerCoder.of()));
-    ValueState<Integer> valueStateIdentical = jstormStateInternals.state(
-        StateNamespaces.global(), StateTags.value("state-id-a", BigEndianIntegerCoder.of()));
-
-    valueState.write(Integer.MIN_VALUE);
-    assertEquals(Integer.MIN_VALUE, valueState.read().longValue());
-    assertEquals(Integer.MIN_VALUE, valueStateIdentical.read().longValue());
-    valueState.write(Integer.MAX_VALUE);
-    assertEquals(Integer.MAX_VALUE, valueState.read().longValue());
-    assertEquals(Integer.MAX_VALUE, valueStateIdentical.read().longValue());
-  }
-
-  @Test
-  public void testBagState() throws Exception {
-    BagState<Integer> bagStateA = jstormStateInternals.state(
-        StateNamespaces.global(), StateTags.bag("state-id-a", BigEndianIntegerCoder.of()));
-    BagState<Integer> bagStateB = jstormStateInternals.state(
-        StateNamespaces.global(), StateTags.bag("state-id-b", BigEndianIntegerCoder.of()));
-
-    bagStateA.add(1);
-    bagStateA.add(0);
-    bagStateA.add(Integer.MAX_VALUE);
-
-    bagStateB.add(0);
-    bagStateB.add(Integer.MIN_VALUE);
-
-    Iterable<Integer> bagA = bagStateA.read();
-    Iterable<Integer> bagB = bagStateB.read();
-    assertThat(bagA, containsInAnyOrder(1, 0, Integer.MAX_VALUE));
-    assertThat(bagB, containsInAnyOrder(0, Integer.MIN_VALUE));
-
-    bagStateA.clear();
-    bagStateA.add(1);
-    bagStateB.add(0);
-    assertThat(bagStateA.read(), containsInAnyOrder(1));
-    assertThat(bagStateB.read(), containsInAnyOrder(0, 0, Integer.MIN_VALUE));
-  }
-
-  @Test
-  public void testCombiningState() throws Exception {
-    Combine.CombineFn<Integer, int[], Integer> combineFn = Max.ofIntegers();
-    Coder<int[]> accumCoder = combineFn.getAccumulatorCoder(
-        CoderRegistry.createDefault(), BigEndianIntegerCoder.of());
-
-    CombiningState<Integer, int[], Integer> combiningState = jstormStateInternals.state(
-        StateNamespaces.global(),
-        StateTags.combiningValue(
-            "state-id-a",
-            accumCoder,
-            combineFn));
-    assertEquals(Integer.MIN_VALUE, combiningState.read().longValue());
-    combiningState.add(10);
-    assertEquals(10, combiningState.read().longValue());
-    combiningState.add(1);
-    assertEquals(10, combiningState.read().longValue());
-    combiningState.add(Integer.MAX_VALUE);
-    assertEquals(Integer.MAX_VALUE, combiningState.read().longValue());
-  }
-
-  @Test
-  public void testWatermarkHoldState() throws Exception {
-    WatermarkHoldState watermarkHoldState = jstormStateInternals.state(
-        StateNamespaces.global(),
-        StateTags.watermarkStateInternal(
-            "state-id-a",
-            TimestampCombiner.EARLIEST));
-    watermarkHoldState.add(new Instant(1));
-    assertEquals(1, watermarkHoldState.read().getMillis());
-    watermarkHoldState.add(new Instant(Integer.MIN_VALUE));
-    assertEquals(Integer.MIN_VALUE, watermarkHoldState.read().getMillis());
-    watermarkHoldState.add(new Instant(Integer.MAX_VALUE));
-    assertEquals(Integer.MIN_VALUE, watermarkHoldState.read().getMillis());
-  }
-
-  @Test
-  public void testMapState() throws Exception {
-    MapState<Integer, Integer> mapStateA = jstormStateInternals.state(
-        StateNamespaces.global(),
-        StateTags.map("state-id-a", BigEndianIntegerCoder.of(), BigEndianIntegerCoder.of()));
-    mapStateA.put(1, 1);
-    mapStateA.put(2, 22);
-    mapStateA.put(1, 12);
-
-    Iterable<Integer> keys = mapStateA.keys().read();
-    Iterable<Integer> values = mapStateA.values().read();
-    assertThat(keys, containsInAnyOrder(1, 2));
-    assertThat(values, containsInAnyOrder(12, 22));
-
-    Iterable<Map.Entry<Integer, Integer>> entries = mapStateA.entries().read();
-    Iterator<Map.Entry<Integer, Integer>> itr = entries.iterator();
-    Map.Entry<Integer, Integer> entry = itr.next();
-    assertEquals((long) entry.getKey(), 1L);
-    assertEquals((long) entry.getValue(), 12L);
-    entry = itr.next();
-    assertEquals((long) entry.getKey(), 2L);
-    assertEquals((long) entry.getValue(), 22L);
-    assertEquals(false, itr.hasNext());
-
-    mapStateA.remove(1);
-    keys = mapStateA.keys().read();
-    values = mapStateA.values().read();
-    assertThat(keys, containsInAnyOrder(2));
-    assertThat(values, containsInAnyOrder(22));
-
-    entries = mapStateA.entries().read();
-    itr = entries.iterator();
-    entry = itr.next();
-    assertEquals((long) entry.getKey(), 2L);
-    assertEquals((long) entry.getValue(), 22L);
-    assertEquals(false, itr.hasNext());
-  }
-
-  @Test
-  public void testMassiveDataOfBagState() {
-    BagState<Integer> bagStateA = jstormStateInternals.state(
-        StateNamespaces.global(), StateTags.bag("state-id-a", BigEndianIntegerCoder.of()));
-
-    int count = 10000;
-    int n = 1;
-    while (n <= count) {
-      bagStateA.add(n);
-      n++;
-    }
-
-    int readCount = 0;
-    int readN = 0;
-    Iterator<Integer> itr = bagStateA.read().iterator();
-    while (itr.hasNext()) {
-      readN += itr.next();
-      readCount++;
-    }
-
-    assertEquals((long) readN, ((1 + count) * count) / 2);
-    assertEquals((long) readCount, count);
-  }
-}