You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pe...@apache.org on 2017/08/20 15:03:21 UTC
[21/53] [abbrv] beam git commit: jstorm-runner: move most classes to
translation package and reduece their visibility to package private.
http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundMultiTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundMultiTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundMultiTranslator.java
deleted file mode 100644
index 6e3392c..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundMultiTranslator.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation.translator;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Maps;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import org.apache.beam.runners.jstorm.translation.TranslationContext;
-import org.apache.beam.runners.jstorm.translation.runtime.DoFnExecutor;
-import org.apache.beam.runners.jstorm.translation.runtime.MultiOutputDoFnExecutor;
-import org.apache.beam.runners.jstorm.translation.runtime.MultiStatefulDoFnExecutor;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.PValueBase;
-import org.apache.beam.sdk.values.TupleTag;
-
-/**
- * Translates a ParDo.BoundMulti to a Storm {@link DoFnExecutor}.
- */
-public class ParDoBoundMultiTranslator<InputT, OutputT>
- extends TransformTranslator.Default<ParDo.MultiOutput<InputT, OutputT>> {
-
- @Override
- public void translateNode(
- ParDo.MultiOutput<InputT, OutputT> transform, TranslationContext context) {
- final TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
- final TupleTag<InputT> inputTag = (TupleTag<InputT>) userGraphContext.getInputTag();
- PCollection<InputT> input = (PCollection<InputT>) userGraphContext.getInput();
-
- Map<TupleTag<?>, PValue> allOutputs = Maps.newHashMap(userGraphContext.getOutputs());
- Map<TupleTag<?>, TupleTag<?>> localToExternalTupleTagMap = Maps.newHashMap();
- for (Map.Entry<TupleTag<?>, PValue> entry : allOutputs.entrySet()) {
- Iterator<TupleTag<?>> itr = ((PValueBase) entry.getValue()).expand().keySet().iterator();
- localToExternalTupleTagMap.put(entry.getKey(), itr.next());
- }
-
- TupleTag<OutputT> mainOutputTag = (TupleTag<OutputT>) userGraphContext.getOutputTag();
- List<TupleTag<?>> sideOutputTags = userGraphContext.getOutputTags();
- sideOutputTags.remove(mainOutputTag);
-
- Map<TupleTag<?>, PValue> allInputs = Maps.newHashMap(userGraphContext.getInputs());
- for (PCollectionView pCollectionView : transform.getSideInputs()) {
- allInputs.put(userGraphContext.findTupleTag(pCollectionView), pCollectionView);
- }
- String description = describeTransform(
- transform,
- allInputs,
- allOutputs);
-
- ImmutableMap.Builder<TupleTag, PCollectionView<?>> sideInputTagToView = ImmutableMap.builder();
- for (PCollectionView pCollectionView : transform.getSideInputs()) {
- sideInputTagToView.put(userGraphContext.findTupleTag(pCollectionView), pCollectionView);
- }
-
- DoFnExecutor executor;
- DoFnSignature signature = DoFnSignatures.getSignature(transform.getFn().getClass());
- if (signature.stateDeclarations().size() > 0
- || signature.timerDeclarations().size() > 0) {
- executor = new MultiStatefulDoFnExecutor<>(
- userGraphContext.getStepName(),
- description,
- userGraphContext.getOptions(),
- (DoFn<KV, OutputT>) transform.getFn(),
- (Coder) WindowedValue.getFullCoder(
- input.getCoder(), input.getWindowingStrategy().getWindowFn().windowCoder()),
- input.getWindowingStrategy(),
- (TupleTag<KV>) inputTag,
- transform.getSideInputs(),
- sideInputTagToView.build(),
- mainOutputTag,
- sideOutputTags,
- localToExternalTupleTagMap);
- } else {
- executor = new MultiOutputDoFnExecutor<>(
- userGraphContext.getStepName(),
- description,
- userGraphContext.getOptions(),
- transform.getFn(),
- WindowedValue.getFullCoder(
- input.getCoder(), input.getWindowingStrategy().getWindowFn().windowCoder()),
- input.getWindowingStrategy(),
- inputTag,
- transform.getSideInputs(),
- sideInputTagToView.build(),
- mainOutputTag,
- sideOutputTags,
- localToExternalTupleTagMap);
- }
-
- context.addTransformExecutor(executor, ImmutableList.<PValue>copyOf(transform.getSideInputs()));
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundTranslator.java
deleted file mode 100644
index ad8f85f..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundTranslator.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation.translator;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
-import java.util.List;
-import java.util.Map;
-import org.apache.beam.runners.jstorm.translation.TranslationContext;
-import org.apache.beam.runners.jstorm.translation.runtime.DoFnExecutor;
-import org.apache.beam.runners.jstorm.translation.runtime.StatefulDoFnExecutor;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TupleTag;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Translates a ParDo.Bound to a JStorm {@link DoFnExecutor}.
- */
-public class ParDoBoundTranslator<InputT, OutputT>
- extends TransformTranslator.Default<ParDo.SingleOutput<InputT, OutputT>> {
-
- private static final Logger LOG = LoggerFactory.getLogger(ParDoBoundTranslator.class);
-
- @Override
- public void translateNode(
- ParDo.SingleOutput<InputT, OutputT> transform, TranslationContext context) {
- final TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
- final TupleTag<?> inputTag = userGraphContext.getInputTag();
- PCollection<InputT> input = (PCollection<InputT>) userGraphContext.getInput();
-
- TupleTag<OutputT> mainOutputTag = (TupleTag<OutputT>) userGraphContext.getOutputTag();
- List<TupleTag<?>> sideOutputTags = Lists.newArrayList();
-
- Map<TupleTag<?>, PValue> allInputs =
- avro.shaded.com.google.common.collect.Maps.newHashMap(userGraphContext.getInputs());
- for (PCollectionView pCollectionView : transform.getSideInputs()) {
- allInputs.put(userGraphContext.findTupleTag(pCollectionView), pCollectionView);
- }
- String description = describeTransform(
- transform,
- allInputs,
- userGraphContext.getOutputs());
-
- ImmutableMap.Builder<TupleTag, PCollectionView<?>> sideInputTagToView = ImmutableMap.builder();
- for (PCollectionView pCollectionView : transform.getSideInputs()) {
- sideInputTagToView.put(userGraphContext.findTupleTag(pCollectionView), pCollectionView);
- }
-
- DoFnExecutor executor;
- DoFnSignature signature = DoFnSignatures.getSignature(transform.getFn().getClass());
- if (signature.stateDeclarations().size() > 0
- || signature.timerDeclarations().size() > 0) {
- executor = new StatefulDoFnExecutor<>(
- userGraphContext.getStepName(),
- description,
- userGraphContext.getOptions(),
- (DoFn<KV, OutputT>) transform.getFn(),
- (Coder) WindowedValue.getFullCoder(
- input.getCoder(), input.getWindowingStrategy().getWindowFn().windowCoder()),
- input.getWindowingStrategy(),
- (TupleTag<KV>) inputTag,
- transform.getSideInputs(),
- sideInputTagToView.build(),
- mainOutputTag,
- sideOutputTags);
- } else {
- executor = new DoFnExecutor<>(
- userGraphContext.getStepName(),
- description,
- userGraphContext.getOptions(),
- transform.getFn(),
- WindowedValue.getFullCoder(
- input.getCoder(), input.getWindowingStrategy().getWindowFn().windowCoder()),
- input.getWindowingStrategy(),
- (TupleTag<InputT>) inputTag,
- transform.getSideInputs(),
- sideInputTagToView.build(),
- mainOutputTag,
- sideOutputTags);
- }
-
- context.addTransformExecutor(executor, ImmutableList.<PValue>copyOf(transform.getSideInputs()));
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/Stream.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/Stream.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/Stream.java
deleted file mode 100644
index 71243b9..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/Stream.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation.translator;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.google.auto.value.AutoValue;
-import java.util.List;
-import javax.annotation.Nullable;
-
-/**
- * Class that defines the stream connection between upstream and downstream components.
- */
-@AutoValue
-public abstract class Stream {
-
- public abstract Producer getProducer();
-
- public abstract Consumer getConsumer();
-
- public static Stream of(Producer producer, Consumer consumer) {
- return new org.apache.beam.runners.jstorm.translation.translator.AutoValue_Stream(
- producer, consumer);
- }
-
- /**
- * JStorm producer.
- */
- @AutoValue
- public abstract static class Producer {
- public abstract String getComponentId();
-
- public abstract String getStreamId();
-
- public abstract String getStreamName();
-
- public static Producer of(String componentId, String streamId, String streamName) {
- return new org.apache.beam.runners.jstorm.translation.translator.AutoValue_Stream_Producer(
- componentId, streamId, streamName);
- }
- }
-
- /**
- * JStorm consumer.
- */
- @AutoValue
- public abstract static class Consumer {
- public abstract String getComponentId();
-
- public abstract Grouping getGrouping();
-
- public static Consumer of(String componentId, Grouping grouping) {
- return new org.apache.beam.runners.jstorm.translation.translator.AutoValue_Stream_Consumer(
- componentId, grouping);
- }
- }
-
- /**
- * JStorm grouping, which define how to transfer message between two nodes.
- */
- @AutoValue
- public abstract static class Grouping {
- public abstract Type getType();
-
- @Nullable
- public abstract List<String> getFields();
-
- public static Grouping of(Type type) {
- checkArgument(!Type.FIELDS.equals(type), "Fields grouping should provide key fields.");
- return new org.apache.beam.runners.jstorm.translation.translator.AutoValue_Stream_Grouping(
- type, null /* fields */);
- }
-
- public static Grouping byFields(List<String> fields) {
- checkNotNull(fields, "fields");
- checkArgument(!fields.isEmpty(), "No key fields were provided for field grouping!");
- return new org.apache.beam.runners.jstorm.translation.translator.AutoValue_Stream_Grouping(
- Type.FIELDS, fields);
- }
-
- /**
- * Types of stream groupings Storm allows.
- */
- public enum Type {
- ALL, CUSTOM, DIRECT, SHUFFLE, LOCAL_OR_SHUFFLE, FIELDS, GLOBAL, NONE
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/TransformTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/TransformTranslator.java
deleted file mode 100644
index bfa94a0..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/TransformTranslator.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation.translator;
-
-import com.google.common.base.Function;
-import com.google.common.base.Joiner;
-import com.google.common.collect.FluentIterable;
-import java.util.Map;
-import org.apache.beam.runners.jstorm.translation.TranslationContext;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TupleTag;
-
-/**
- * Interface for classes capable of tranforming Beam PTransforms into Storm primitives.
- */
-public interface TransformTranslator<T extends PTransform<?, ?>> {
-
- void translateNode(T transform, TranslationContext context);
-
- /**
- * Returns true if this translator can translate the given transform.
- */
- boolean canTranslate(T transform, TranslationContext context);
-
- /**
- * Default translator.
- * @param <T1>
- */
- class Default<T1 extends PTransform<?, ?>> implements TransformTranslator<T1> {
- @Override
- public void translateNode(T1 transform, TranslationContext context) {
-
- }
-
- @Override
- public boolean canTranslate(T1 transform, TranslationContext context) {
- return true;
- }
-
- static String describeTransform(
- PTransform<?, ?> transform,
- Map<TupleTag<?>, PValue> inputs,
- Map<TupleTag<?>, PValue> outputs) {
- return String.format("%s --> %s --> %s",
- Joiner.on('+').join(FluentIterable.from(inputs.entrySet())
- .transform(new Function<Map.Entry<TupleTag<?>, PValue>, String>() {
- @Override
- public String apply(Map.Entry<TupleTag<?>, PValue> taggedPValue) {
- return taggedPValue.getKey().getId();
- // return taggedPValue.getValue().getName();
- }
- })),
- transform.getName(),
- Joiner.on('+').join(FluentIterable.from(outputs.entrySet())
- .transform(new Function<Map.Entry<TupleTag<?>, PValue>, String>() {
- @Override
- public String apply(Map.Entry<TupleTag<?>, PValue> taggedPvalue) {
- return taggedPvalue.getKey().getId();
- //return taggedPValue.getValue().getName();
- }
- })));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/UnboundedSourceTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/UnboundedSourceTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/UnboundedSourceTranslator.java
deleted file mode 100644
index 33ac024..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/UnboundedSourceTranslator.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation.translator;
-
-import org.apache.beam.runners.jstorm.translation.TranslationContext;
-import org.apache.beam.runners.jstorm.translation.runtime.UnboundedSourceSpout;
-import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TaggedPValue;
-import org.apache.beam.sdk.values.TupleTag;
-
-/**
- * Translates a Read.Unbounded into a Storm spout.
- *
- * @param <T>
- */
-public class UnboundedSourceTranslator<T> extends TransformTranslator.Default<Read.Unbounded<T>> {
- public void translateNode(Read.Unbounded<T> transform, TranslationContext context) {
- TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
- String description =
- describeTransform(transform, userGraphContext.getInputs(), userGraphContext.getOutputs());
-
- TupleTag<?> tag = userGraphContext.getOutputTag();
- PValue output = userGraphContext.getOutput();
-
- UnboundedSourceSpout spout = new UnboundedSourceSpout(
- description,
- transform.getSource(), userGraphContext.getOptions(), tag);
- context.getExecutionGraphContext().registerSpout(spout, TaggedPValue.of(tag, output));
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ViewTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ViewTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ViewTranslator.java
deleted file mode 100644
index f71ee9c..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ViewTranslator.java
+++ /dev/null
@@ -1,380 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation.translator;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import org.apache.beam.runners.jstorm.translation.TranslationContext;
-import org.apache.beam.runners.jstorm.translation.runtime.ViewExecutor;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderRegistry;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.ListCoder;
-import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.View;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PCollectionViews;
-
-/**
- * A {@link TransformTranslator} for executing {@link View Views} in JStorm runner.
- */
-public class ViewTranslator
- extends TransformTranslator.Default<ViewTranslator.CreateJStormPCollectionView<?, ?>> {
- @Override
- public void translateNode(
- CreateJStormPCollectionView<?, ?> transform, TranslationContext context) {
- TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
- String description = describeTransform(
- transform, userGraphContext.getInputs(), userGraphContext.getOutputs());
- ViewExecutor viewExecutor = new ViewExecutor(description, userGraphContext.getOutputTag());
- context.addTransformExecutor(viewExecutor);
- }
-
- /**
- * Specialized implementation for
- * {@link org.apache.beam.sdk.transforms.View.AsMap View.AsMap}.
- */
- public static class ViewAsMap<K, V>
- extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, V>>> {
-
- @SuppressWarnings("unused") // used via reflection in JstormRunner#apply()
- public ViewAsMap(View.AsMap<K, V> transform) {
- }
-
- @Override
- public PCollectionView<Map<K, V>> expand(PCollection<KV<K, V>> input) {
- PCollectionView<Map<K, V>> view =
- PCollectionViews.mapView(
- input,
- input.getWindowingStrategy(),
- input.getCoder());
-
- @SuppressWarnings({"rawtypes", "unchecked"})
- KvCoder<K, V> inputCoder = (KvCoder) input.getCoder();
- try {
- inputCoder.getKeyCoder().verifyDeterministic();
- } catch (Coder.NonDeterministicException e) {
- // TODO: log warning as other runners.
- }
-
- return input
- .apply(Combine.globally(new Concatenate<KV<K, V>>()).withoutDefaults())
- .apply(CreateJStormPCollectionView.<KV<K, V>, Map<K, V>>of(view));
- }
-
- @Override
- protected String getKindString() {
- return "StreamingViewAsMap";
- }
- }
-
- /**
- * Specialized expansion for {@link
- * View.AsMultimap View.AsMultimap}.
- */
- public static class ViewAsMultimap<K, V>
- extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, Iterable<V>>>> {
-
- /**
- * Builds an instance of this class from the overridden transform.
- */
- @SuppressWarnings("unused") // used via reflection in JStormRunner#apply()
- public ViewAsMultimap(View.AsMultimap<K, V> transform) {
- }
-
- @Override
- public PCollectionView<Map<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) {
- PCollectionView<Map<K, Iterable<V>>> view =
- PCollectionViews.multimapView(
- input,
- input.getWindowingStrategy(),
- input.getCoder());
-
- @SuppressWarnings({"rawtypes", "unchecked"})
- KvCoder<K, V> inputCoder = (KvCoder) input.getCoder();
- try {
- inputCoder.getKeyCoder().verifyDeterministic();
- } catch (Coder.NonDeterministicException e) {
- // TODO: log warning as other runners.
- }
-
- return input
- .apply(Combine.globally(new Concatenate<KV<K, V>>()).withoutDefaults())
- .apply(CreateJStormPCollectionView.<KV<K, V>, Map<K, Iterable<V>>>of(view));
- }
-
- @Override
- protected String getKindString() {
- return "StreamingViewAsMultimap";
- }
- }
-
- /**
- * Specialized implementation for
- * {@link View.AsList View.AsList}.
- */
- public static class ViewAsList<T>
- extends PTransform<PCollection<T>, PCollectionView<List<T>>> {
- /**
- * Builds an instance of this class from the overridden transform.
- */
- @SuppressWarnings("unused") // used via reflection in JStormRunner#apply()
- public ViewAsList(View.AsList<T> transform) {
- }
-
- @Override
- public PCollectionView<List<T>> expand(PCollection<T> input) {
- PCollectionView<List<T>> view =
- PCollectionViews.listView(
- input,
- input.getWindowingStrategy(),
- input.getCoder());
-
- return input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults())
- .apply(CreateJStormPCollectionView.<T, List<T>>of(view));
- }
-
- @Override
- protected String getKindString() {
- return "StreamingViewAsList";
- }
- }
-
- /**
- * Specialized implementation for
- * {@link View.AsIterable View.AsIterable} for the
- * JStorm runner in streaming mode.
- */
- public static class ViewAsIterable<T>
- extends PTransform<PCollection<T>, PCollectionView<Iterable<T>>> {
- /**
- * Builds an instance of this class from the overridden transform.
- */
- @SuppressWarnings("unused") // used via reflection in JStormRunner#apply()
- public ViewAsIterable(View.AsIterable<T> transform) {
- }
-
- @Override
- public PCollectionView<Iterable<T>> expand(PCollection<T> input) {
- PCollectionView<Iterable<T>> view =
- PCollectionViews.iterableView(
- input,
- input.getWindowingStrategy(),
- input.getCoder());
-
- return input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults())
- .apply(CreateJStormPCollectionView.<T, Iterable<T>>of(view));
- }
-
- @Override
- protected String getKindString() {
- return "StreamingViewAsIterable";
- }
- }
-
- /**
- * Specialized expansion for
- * {@link View.AsSingleton View.AsSingleton} for the
- * JStorm runner in streaming mode.
- */
- public static class ViewAsSingleton<T>
- extends PTransform<PCollection<T>, PCollectionView<T>> {
- private View.AsSingleton<T> transform;
-
- /**
- * Builds an instance of this class from the overridden transform.
- */
- @SuppressWarnings("unused") // used via reflection in JStormRunner#apply()
- public ViewAsSingleton(View.AsSingleton<T> transform) {
- this.transform = transform;
- }
-
- @Override
- public PCollectionView<T> expand(PCollection<T> input) {
- Combine.Globally<T, T> combine = Combine.globally(
- new SingletonCombine<>(transform.hasDefaultValue(), transform.defaultValue()));
- if (!transform.hasDefaultValue()) {
- combine = combine.withoutDefaults();
- }
- return input.apply(combine.asSingletonView());
- }
-
- @Override
- protected String getKindString() {
- return "StreamingViewAsSingleton";
- }
-
- private static class SingletonCombine<T> extends Combine.BinaryCombineFn<T> {
- private boolean hasDefaultValue;
- private T defaultValue;
-
- SingletonCombine(boolean hasDefaultValue, T defaultValue) {
- this.hasDefaultValue = hasDefaultValue;
- this.defaultValue = defaultValue;
- }
-
- @Override
- public T apply(T left, T right) {
- throw new IllegalArgumentException("PCollection with more than one element "
- + "accessed as a singleton view. Consider using Combine.globally().asSingleton() to "
- + "combine the PCollection into a single value");
- }
-
- @Override
- public T identity() {
- if (hasDefaultValue) {
- return defaultValue;
- } else {
- throw new IllegalArgumentException(
- "Empty PCollection accessed as a singleton view. "
- + "Consider setting withDefault to provide a default value");
- }
- }
- }
- }
-
- /**
- * Specialized expansion for
- * {@link org.apache.beam.sdk.transforms.Combine.GloballyAsSingletonView}.
- * @param <InputT>
- * @param <OutputT>
- */
- public static class CombineGloballyAsSingletonView<InputT, OutputT>
- extends PTransform<PCollection<InputT>, PCollectionView<OutputT>> {
- Combine.GloballyAsSingletonView<InputT, OutputT> transform;
-
- /**
- * Builds an instance of this class from the overridden transform.
- */
- @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
- public CombineGloballyAsSingletonView(
- Combine.GloballyAsSingletonView<InputT, OutputT> transform) {
- this.transform = transform;
- }
-
- @Override
- public PCollectionView<OutputT> expand(PCollection<InputT> input) {
- PCollection<OutputT> combined =
- input.apply(Combine.globally(transform.getCombineFn())
- .withoutDefaults()
- .withFanout(transform.getFanout()));
-
- PCollectionView<OutputT> view = PCollectionViews.singletonView(
- combined,
- combined.getWindowingStrategy(),
- transform.getInsertDefault(),
- transform.getInsertDefault()
- ? transform.getCombineFn().defaultValue() : null,
- combined.getCoder());
- return combined
- .apply(ParDo.of(new WrapAsList<OutputT>()))
- .apply(CreateJStormPCollectionView.<OutputT, OutputT>of(view));
- }
-
- @Override
- protected String getKindString() {
- return "StreamingCombineGloballyAsSingletonView";
- }
- }
-
- private static class WrapAsList<T> extends DoFn<T, List<T>> {
- @ProcessElement
- public void processElement(ProcessContext c) {
- c.output(Collections.singletonList(c.element()));
- }
- }
-
- /**
- * Combiner that combines {@code T}s into a single {@code List<T>} containing all inputs.
- * They require the input {@link PCollection} fits in memory.
- * For a large {@link PCollection} this is expected to crash!
- *
- * @param <T> the type of elements to concatenate.
- */
- private static class Concatenate<T> extends Combine.CombineFn<T, List<T>, List<T>> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public List<T> createAccumulator() {
- return new ArrayList<>();
- }
-
- @Override
- public List<T> addInput(List<T> accumulator, T input) {
- accumulator.add(input);
- return accumulator;
- }
-
- @Override
- public List<T> mergeAccumulators(Iterable<List<T>> accumulators) {
- List<T> result = createAccumulator();
- for (List<T> accumulator : accumulators) {
- result.addAll(accumulator);
- }
- return result;
- }
-
- @Override
- public List<T> extractOutput(List<T> accumulator) {
- return accumulator;
- }
-
- @Override
- public Coder<List<T>> getAccumulatorCoder(CoderRegistry registry, Coder<T> inputCoder) {
- return ListCoder.of(inputCoder);
- }
-
- @Override
- public Coder<List<T>> getDefaultOutputCoder(CoderRegistry registry, Coder<T> inputCoder) {
- return ListCoder.of(inputCoder);
- }
- }
-
- /**
- * Creates a primitive {@link PCollectionView}.
- * For internal use only by runner implementors.
- *
- * @param <ElemT> The type of the elements of the input PCollection
- * @param <ViewT> The type associated with the {@link PCollectionView} used as a side input
- */
- public static class CreateJStormPCollectionView<ElemT, ViewT>
- extends PTransform<PCollection<List<ElemT>>, PCollectionView<ViewT>> {
- private PCollectionView<ViewT> view;
-
- private CreateJStormPCollectionView(PCollectionView<ViewT> view) {
- this.view = view;
- }
-
- public static <ElemT, ViewT> CreateJStormPCollectionView<ElemT, ViewT> of(
- PCollectionView<ViewT> view) {
- return new CreateJStormPCollectionView<>(view);
- }
-
- @Override
- public PCollectionView<ViewT> expand(PCollection<List<ElemT>> input) {
- return view;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/WindowAssignTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/WindowAssignTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/WindowAssignTranslator.java
deleted file mode 100644
index 2ccb8d7..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/WindowAssignTranslator.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation.translator;
-
-import org.apache.beam.runners.jstorm.translation.TranslationContext;
-import org.apache.beam.runners.jstorm.translation.runtime.WindowAssignExecutor;
-import org.apache.beam.sdk.transforms.windowing.Window;
-
-/**
- * Translates a {@link org.apache.beam.sdk.transforms.windowing.Window.Assign} to a
- * JStorm {@link WindowAssignExecutor}.
- * @param <T>
- */
-public class WindowAssignTranslator<T> extends TransformTranslator.Default<Window.Assign<T>> {
-
- @Override
- public void translateNode(Window.Assign<T> transform, TranslationContext context) {
- TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
- String description =
- describeTransform(transform, userGraphContext.getInputs(), userGraphContext.getOutputs());
- context.getUserGraphContext().setWindowed();
- WindowAssignExecutor executor = new WindowAssignExecutor(
- description,
- transform.getWindowFn(),
- userGraphContext.getOutputTag());
- context.addTransformExecutor(executor);
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/CommonInstance.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/CommonInstance.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/CommonInstance.java
deleted file mode 100644
index 4b92a4c..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/CommonInstance.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation.util;
-
-/**
- * Common definition of JStorm runner.
- */
-public class CommonInstance {
- public static final String KEY = "Key";
- public static final String VALUE = "Value";
-
- public static final String BEAM_WATERMARK_STREAM_ID = "BEAM_WATERMARK";
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/DefaultStepContext.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/DefaultStepContext.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/DefaultStepContext.java
deleted file mode 100644
index 4eb1d8f..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/DefaultStepContext.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation.util;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import java.io.IOException;
-import org.apache.beam.runners.core.ExecutionContext;
-import org.apache.beam.runners.core.StateInternals;
-import org.apache.beam.runners.core.TimerInternals;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.TupleTag;
-
-/**
- * Default StepContext for running DoFn This does not allow accessing state or timer internals.
- */
-public class DefaultStepContext implements ExecutionContext.StepContext {
-
- private TimerInternals timerInternals;
-
- private StateInternals stateInternals;
-
- public DefaultStepContext(TimerInternals timerInternals, StateInternals stateInternals) {
- this.timerInternals = checkNotNull(timerInternals, "timerInternals");
- this.stateInternals = checkNotNull(stateInternals, "stateInternals");
- }
-
- @Override
- public String getStepName() {
- return null;
- }
-
- @Override
- public String getTransformName() {
- return null;
- }
-
- @Override
- public void noteOutput(WindowedValue<?> windowedValue) {
-
- }
-
- @Override
- public void noteOutput(TupleTag<?> tupleTag, WindowedValue<?> windowedValue) {
-
- }
-
- @Override
- public <T, W extends BoundedWindow> void writePCollectionViewData(
- TupleTag<?> tag, Iterable<WindowedValue<T>> data,
- Coder<Iterable<WindowedValue<T>>> dataCoder, W window, Coder<W> windowCoder)
- throws IOException {
- throw new UnsupportedOperationException("Writing side-input data is not supported.");
- }
-
- @Override
- public StateInternals stateInternals() {
- return stateInternals;
- }
-
- @Override
- public TimerInternals timerInternals() {
- return timerInternals;
- }
-
- public void setStateInternals(StateInternals stateInternals) {
- this.stateInternals = stateInternals;
- }
-
- public void setTimerInternals(TimerInternals timerInternals) {
- this.timerInternals = timerInternals;
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/RunnerUtils.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/RunnerUtils.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/RunnerUtils.java
deleted file mode 100644
index ad83c2b..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/RunnerUtils.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.util;
-
-import org.apache.beam.runners.core.KeyedWorkItem;
-import org.apache.beam.runners.jstorm.translation.runtime.Executor;
-import org.apache.beam.runners.jstorm.translation.runtime.GroupByWindowExecutor;
-import org.apache.beam.runners.jstorm.translation.runtime.MultiStatefulDoFnExecutor;
-import org.apache.beam.runners.jstorm.translation.runtime.StatefulDoFnExecutor;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.KV;
-
-/**
- * Utils for JStorm runner.
- */
-public class RunnerUtils {
- /**
- * Convert {@link WindowedValue} into {@link KeyedWorkItem}.
- * @param elem
- * @return
- */
- public static <K, V> KeyedWorkItem<K, V> toKeyedWorkItem(WindowedValue<KV<K, V>> elem) {
- WindowedValue<KV<K, V>> kvElem = (WindowedValue<KV<K, V>>) elem;
- SingletonKeyedWorkItem<K, V> workItem = SingletonKeyedWorkItem.of(
- kvElem.getValue().getKey(),
- kvElem.withValue(kvElem.getValue().getValue()));
- return workItem;
- }
-
- public static boolean isGroupByKeyExecutor(Executor executor) {
- if (executor instanceof GroupByWindowExecutor) {
- return true;
- } else if (executor instanceof StatefulDoFnExecutor
- || executor instanceof MultiStatefulDoFnExecutor) {
- return true;
- } else {
- return false;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SerializedPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SerializedPipelineOptions.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SerializedPipelineOptions.java
deleted file mode 100644
index 479afdc..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SerializedPipelineOptions.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.jstorm.util;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.Serializable;
-import org.apache.beam.sdk.options.PipelineOptions;
-
-/**
- * Encapsulates the PipelineOptions in serialized form to ship them to the cluster.
- */
-public class SerializedPipelineOptions implements Serializable {
-
- private final byte[] serializedOptions;
-
- /**
- * Lazily initialized copy of deserialized options.
- */
- private transient PipelineOptions pipelineOptions;
-
- public SerializedPipelineOptions(PipelineOptions options) {
- checkNotNull(options, "PipelineOptions must not be null.");
-
- try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
- new ObjectMapper().writeValue(baos, options);
- this.serializedOptions = baos.toByteArray();
- } catch (Exception e) {
- throw new RuntimeException("Couldn't serialize PipelineOptions.", e);
- }
-
- }
-
- public PipelineOptions getPipelineOptions() {
- if (pipelineOptions == null) {
- try {
- pipelineOptions = new ObjectMapper().readValue(serializedOptions, PipelineOptions.class);
- } catch (IOException e) {
- throw new RuntimeException("Couldn't deserialize the PipelineOptions.", e);
- }
- }
-
- return pipelineOptions;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SingletonKeyedWorkItem.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SingletonKeyedWorkItem.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SingletonKeyedWorkItem.java
deleted file mode 100644
index 46a12b9..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SingletonKeyedWorkItem.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.util;
-
-import java.util.Collections;
-import org.apache.beam.runners.core.KeyedWorkItem;
-import org.apache.beam.runners.core.TimerInternals;
-import org.apache.beam.sdk.util.WindowedValue;
-
-/**
- * Singleton keyed word item.
- * @param <K>
- * @param <ElemT>
- */
-public class SingletonKeyedWorkItem<K, ElemT> implements KeyedWorkItem<K, ElemT> {
-
- final K key;
- final WindowedValue<ElemT> value;
-
- private SingletonKeyedWorkItem(K key, WindowedValue<ElemT> value) {
- this.key = key;
- this.value = value;
- }
-
- public static <K, ElemT> SingletonKeyedWorkItem<K, ElemT> of(K key, WindowedValue<ElemT> value) {
- return new SingletonKeyedWorkItem<K, ElemT>(key, value);
- }
-
- @Override
- public K key() {
- return key;
- }
-
- public WindowedValue<ElemT> value() {
- return value;
- }
-
- @Override
- public Iterable<TimerInternals.TimerData> timersIterable() {
- return Collections.EMPTY_LIST;
- }
-
- @Override
- public Iterable<WindowedValue<ElemT>> elementsIterable() {
- return Collections.singletonList(value);
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/JStormStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/JStormStateInternalsTest.java b/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/JStormStateInternalsTest.java
new file mode 100644
index 0000000..b2ca267
--- /dev/null
+++ b/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/JStormStateInternalsTest.java
@@ -0,0 +1,221 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+import com.alibaba.jstorm.cache.IKvStoreManager;
+import com.alibaba.jstorm.cache.rocksdb.RocksDbKvStoreManagerFactory;
+import com.alibaba.jstorm.utils.KryoSerializer;
+import com.google.common.collect.Maps;
+import java.util.Iterator;
+import java.util.Map;
+import org.apache.beam.runners.core.StateNamespaces;
+import org.apache.beam.runners.core.StateTags;
+import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.state.BagState;
+import org.apache.beam.sdk.state.CombiningState;
+import org.apache.beam.sdk.state.MapState;
+import org.apache.beam.sdk.state.ValueState;
+import org.apache.beam.sdk.state.WatermarkHoldState;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.Max;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link JStormStateInternals}.
+ */
+@RunWith(JUnit4.class)
+public class JStormStateInternalsTest {
+
+ @Rule
+ public final TemporaryFolder tmp = new TemporaryFolder();
+
+ private JStormStateInternals<String> jstormStateInternals;
+
+ @Before
+ public void setup() throws Exception {
+ IKvStoreManager kvStoreManager = RocksDbKvStoreManagerFactory.getManager(
+ Maps.newHashMap(),
+ "test",
+ tmp.toString(),
+ new KryoSerializer(Maps.newHashMap()));
+ jstormStateInternals = new JStormStateInternals(
+ "key-1", kvStoreManager, new TimerServiceImpl(), 0);
+ }
+
+ @Test
+ public void testValueState() throws Exception {
+ ValueState<Integer> valueState = jstormStateInternals.state(
+ StateNamespaces.global(), StateTags.value("state-id-a", BigEndianIntegerCoder.of()));
+ valueState.write(Integer.MIN_VALUE);
+ assertEquals(Integer.MIN_VALUE, valueState.read().longValue());
+ valueState.write(Integer.MAX_VALUE);
+ assertEquals(Integer.MAX_VALUE, valueState.read().longValue());
+ }
+
+ @Test
+ public void testValueStateIdenticalId() throws Exception {
+ ValueState<Integer> valueState = jstormStateInternals.state(
+ StateNamespaces.global(), StateTags.value("state-id-a", BigEndianIntegerCoder.of()));
+ ValueState<Integer> valueStateIdentical = jstormStateInternals.state(
+ StateNamespaces.global(), StateTags.value("state-id-a", BigEndianIntegerCoder.of()));
+
+ valueState.write(Integer.MIN_VALUE);
+ assertEquals(Integer.MIN_VALUE, valueState.read().longValue());
+ assertEquals(Integer.MIN_VALUE, valueStateIdentical.read().longValue());
+ valueState.write(Integer.MAX_VALUE);
+ assertEquals(Integer.MAX_VALUE, valueState.read().longValue());
+ assertEquals(Integer.MAX_VALUE, valueStateIdentical.read().longValue());
+ }
+
+ @Test
+ public void testBagState() throws Exception {
+ BagState<Integer> bagStateA = jstormStateInternals.state(
+ StateNamespaces.global(), StateTags.bag("state-id-a", BigEndianIntegerCoder.of()));
+ BagState<Integer> bagStateB = jstormStateInternals.state(
+ StateNamespaces.global(), StateTags.bag("state-id-b", BigEndianIntegerCoder.of()));
+
+ bagStateA.add(1);
+ bagStateA.add(0);
+ bagStateA.add(Integer.MAX_VALUE);
+
+ bagStateB.add(0);
+ bagStateB.add(Integer.MIN_VALUE);
+
+ Iterable<Integer> bagA = bagStateA.read();
+ Iterable<Integer> bagB = bagStateB.read();
+ assertThat(bagA, containsInAnyOrder(1, 0, Integer.MAX_VALUE));
+ assertThat(bagB, containsInAnyOrder(0, Integer.MIN_VALUE));
+
+ bagStateA.clear();
+ bagStateA.add(1);
+ bagStateB.add(0);
+ assertThat(bagStateA.read(), containsInAnyOrder(1));
+ assertThat(bagStateB.read(), containsInAnyOrder(0, 0, Integer.MIN_VALUE));
+ }
+
+ @Test
+ public void testCombiningState() throws Exception {
+ Combine.CombineFn<Integer, int[], Integer> combineFn = Max.ofIntegers();
+ Coder<int[]> accumCoder = combineFn.getAccumulatorCoder(
+ CoderRegistry.createDefault(), BigEndianIntegerCoder.of());
+
+ CombiningState<Integer, int[], Integer> combiningState = jstormStateInternals.state(
+ StateNamespaces.global(),
+ StateTags.combiningValue(
+ "state-id-a",
+ accumCoder,
+ combineFn));
+ assertEquals(Integer.MIN_VALUE, combiningState.read().longValue());
+ combiningState.add(10);
+ assertEquals(10, combiningState.read().longValue());
+ combiningState.add(1);
+ assertEquals(10, combiningState.read().longValue());
+ combiningState.add(Integer.MAX_VALUE);
+ assertEquals(Integer.MAX_VALUE, combiningState.read().longValue());
+ }
+
+ @Test
+ public void testWatermarkHoldState() throws Exception {
+ WatermarkHoldState watermarkHoldState = jstormStateInternals.state(
+ StateNamespaces.global(),
+ StateTags.watermarkStateInternal(
+ "state-id-a",
+ TimestampCombiner.EARLIEST));
+ watermarkHoldState.add(new Instant(1));
+ assertEquals(1, watermarkHoldState.read().getMillis());
+ watermarkHoldState.add(new Instant(Integer.MIN_VALUE));
+ assertEquals(Integer.MIN_VALUE, watermarkHoldState.read().getMillis());
+ watermarkHoldState.add(new Instant(Integer.MAX_VALUE));
+ assertEquals(Integer.MIN_VALUE, watermarkHoldState.read().getMillis());
+ }
+
+ @Test
+ public void testMapState() throws Exception {
+ MapState<Integer, Integer> mapStateA = jstormStateInternals.state(
+ StateNamespaces.global(),
+ StateTags.map("state-id-a", BigEndianIntegerCoder.of(), BigEndianIntegerCoder.of()));
+ mapStateA.put(1, 1);
+ mapStateA.put(2, 22);
+ mapStateA.put(1, 12);
+
+ Iterable<Integer> keys = mapStateA.keys().read();
+ Iterable<Integer> values = mapStateA.values().read();
+ assertThat(keys, containsInAnyOrder(1, 2));
+ assertThat(values, containsInAnyOrder(12, 22));
+
+ Iterable<Map.Entry<Integer, Integer>> entries = mapStateA.entries().read();
+ Iterator<Map.Entry<Integer, Integer>> itr = entries.iterator();
+ Map.Entry<Integer, Integer> entry = itr.next();
+ assertEquals((long) entry.getKey(), 1L);
+ assertEquals((long) entry.getValue(), 12L);
+ entry = itr.next();
+ assertEquals((long) entry.getKey(), 2L);
+ assertEquals((long) entry.getValue(), 22L);
+ assertEquals(false, itr.hasNext());
+
+ mapStateA.remove(1);
+ keys = mapStateA.keys().read();
+ values = mapStateA.values().read();
+ assertThat(keys, containsInAnyOrder(2));
+ assertThat(values, containsInAnyOrder(22));
+
+ entries = mapStateA.entries().read();
+ itr = entries.iterator();
+ entry = itr.next();
+ assertEquals((long) entry.getKey(), 2L);
+ assertEquals((long) entry.getValue(), 22L);
+ assertEquals(false, itr.hasNext());
+ }
+
+ @Test
+ public void testMassiveDataOfBagState() {
+ BagState<Integer> bagStateA = jstormStateInternals.state(
+ StateNamespaces.global(), StateTags.bag("state-id-a", BigEndianIntegerCoder.of()));
+
+ int count = 10000;
+ int n = 1;
+ while (n <= count) {
+ bagStateA.add(n);
+ n++;
+ }
+
+ int readCount = 0;
+ int readN = 0;
+ Iterator<Integer> itr = bagStateA.read().iterator();
+ while (itr.hasNext()) {
+ readN += itr.next();
+ readCount++;
+ }
+
+ assertEquals((long) readN, ((1 + count) * count) / 2);
+ assertEquals((long) readCount, count);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormStateInternalsTest.java b/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormStateInternalsTest.java
deleted file mode 100644
index 66f33a7..0000000
--- a/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormStateInternalsTest.java
+++ /dev/null
@@ -1,222 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation.runtime.state;
-
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
-
-import com.alibaba.jstorm.cache.IKvStoreManager;
-import com.alibaba.jstorm.cache.rocksdb.RocksDbKvStoreManagerFactory;
-import com.alibaba.jstorm.utils.KryoSerializer;
-import com.google.common.collect.Maps;
-import java.util.Iterator;
-import java.util.Map;
-import org.apache.beam.runners.core.StateNamespaces;
-import org.apache.beam.runners.core.StateTags;
-import org.apache.beam.runners.jstorm.translation.runtime.TimerServiceImpl;
-import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderRegistry;
-import org.apache.beam.sdk.state.BagState;
-import org.apache.beam.sdk.state.CombiningState;
-import org.apache.beam.sdk.state.MapState;
-import org.apache.beam.sdk.state.ValueState;
-import org.apache.beam.sdk.state.WatermarkHoldState;
-import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.Max;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
-import org.joda.time.Instant;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link JStormStateInternals}.
- */
-@RunWith(JUnit4.class)
-public class JStormStateInternalsTest {
-
- @Rule
- public final TemporaryFolder tmp = new TemporaryFolder();
-
- private JStormStateInternals<String> jstormStateInternals;
-
- @Before
- public void setup() throws Exception {
- IKvStoreManager kvStoreManager = RocksDbKvStoreManagerFactory.getManager(
- Maps.newHashMap(),
- "test",
- tmp.toString(),
- new KryoSerializer(Maps.newHashMap()));
- jstormStateInternals = new JStormStateInternals(
- "key-1", kvStoreManager, new TimerServiceImpl(), 0);
- }
-
- @Test
- public void testValueState() throws Exception {
- ValueState<Integer> valueState = jstormStateInternals.state(
- StateNamespaces.global(), StateTags.value("state-id-a", BigEndianIntegerCoder.of()));
- valueState.write(Integer.MIN_VALUE);
- assertEquals(Integer.MIN_VALUE, valueState.read().longValue());
- valueState.write(Integer.MAX_VALUE);
- assertEquals(Integer.MAX_VALUE, valueState.read().longValue());
- }
-
- @Test
- public void testValueStateIdenticalId() throws Exception {
- ValueState<Integer> valueState = jstormStateInternals.state(
- StateNamespaces.global(), StateTags.value("state-id-a", BigEndianIntegerCoder.of()));
- ValueState<Integer> valueStateIdentical = jstormStateInternals.state(
- StateNamespaces.global(), StateTags.value("state-id-a", BigEndianIntegerCoder.of()));
-
- valueState.write(Integer.MIN_VALUE);
- assertEquals(Integer.MIN_VALUE, valueState.read().longValue());
- assertEquals(Integer.MIN_VALUE, valueStateIdentical.read().longValue());
- valueState.write(Integer.MAX_VALUE);
- assertEquals(Integer.MAX_VALUE, valueState.read().longValue());
- assertEquals(Integer.MAX_VALUE, valueStateIdentical.read().longValue());
- }
-
- @Test
- public void testBagState() throws Exception {
- BagState<Integer> bagStateA = jstormStateInternals.state(
- StateNamespaces.global(), StateTags.bag("state-id-a", BigEndianIntegerCoder.of()));
- BagState<Integer> bagStateB = jstormStateInternals.state(
- StateNamespaces.global(), StateTags.bag("state-id-b", BigEndianIntegerCoder.of()));
-
- bagStateA.add(1);
- bagStateA.add(0);
- bagStateA.add(Integer.MAX_VALUE);
-
- bagStateB.add(0);
- bagStateB.add(Integer.MIN_VALUE);
-
- Iterable<Integer> bagA = bagStateA.read();
- Iterable<Integer> bagB = bagStateB.read();
- assertThat(bagA, containsInAnyOrder(1, 0, Integer.MAX_VALUE));
- assertThat(bagB, containsInAnyOrder(0, Integer.MIN_VALUE));
-
- bagStateA.clear();
- bagStateA.add(1);
- bagStateB.add(0);
- assertThat(bagStateA.read(), containsInAnyOrder(1));
- assertThat(bagStateB.read(), containsInAnyOrder(0, 0, Integer.MIN_VALUE));
- }
-
- @Test
- public void testCombiningState() throws Exception {
- Combine.CombineFn<Integer, int[], Integer> combineFn = Max.ofIntegers();
- Coder<int[]> accumCoder = combineFn.getAccumulatorCoder(
- CoderRegistry.createDefault(), BigEndianIntegerCoder.of());
-
- CombiningState<Integer, int[], Integer> combiningState = jstormStateInternals.state(
- StateNamespaces.global(),
- StateTags.combiningValue(
- "state-id-a",
- accumCoder,
- combineFn));
- assertEquals(Integer.MIN_VALUE, combiningState.read().longValue());
- combiningState.add(10);
- assertEquals(10, combiningState.read().longValue());
- combiningState.add(1);
- assertEquals(10, combiningState.read().longValue());
- combiningState.add(Integer.MAX_VALUE);
- assertEquals(Integer.MAX_VALUE, combiningState.read().longValue());
- }
-
- @Test
- public void testWatermarkHoldState() throws Exception {
- WatermarkHoldState watermarkHoldState = jstormStateInternals.state(
- StateNamespaces.global(),
- StateTags.watermarkStateInternal(
- "state-id-a",
- TimestampCombiner.EARLIEST));
- watermarkHoldState.add(new Instant(1));
- assertEquals(1, watermarkHoldState.read().getMillis());
- watermarkHoldState.add(new Instant(Integer.MIN_VALUE));
- assertEquals(Integer.MIN_VALUE, watermarkHoldState.read().getMillis());
- watermarkHoldState.add(new Instant(Integer.MAX_VALUE));
- assertEquals(Integer.MIN_VALUE, watermarkHoldState.read().getMillis());
- }
-
- @Test
- public void testMapState() throws Exception {
- MapState<Integer, Integer> mapStateA = jstormStateInternals.state(
- StateNamespaces.global(),
- StateTags.map("state-id-a", BigEndianIntegerCoder.of(), BigEndianIntegerCoder.of()));
- mapStateA.put(1, 1);
- mapStateA.put(2, 22);
- mapStateA.put(1, 12);
-
- Iterable<Integer> keys = mapStateA.keys().read();
- Iterable<Integer> values = mapStateA.values().read();
- assertThat(keys, containsInAnyOrder(1, 2));
- assertThat(values, containsInAnyOrder(12, 22));
-
- Iterable<Map.Entry<Integer, Integer>> entries = mapStateA.entries().read();
- Iterator<Map.Entry<Integer, Integer>> itr = entries.iterator();
- Map.Entry<Integer, Integer> entry = itr.next();
- assertEquals((long) entry.getKey(), 1L);
- assertEquals((long) entry.getValue(), 12L);
- entry = itr.next();
- assertEquals((long) entry.getKey(), 2L);
- assertEquals((long) entry.getValue(), 22L);
- assertEquals(false, itr.hasNext());
-
- mapStateA.remove(1);
- keys = mapStateA.keys().read();
- values = mapStateA.values().read();
- assertThat(keys, containsInAnyOrder(2));
- assertThat(values, containsInAnyOrder(22));
-
- entries = mapStateA.entries().read();
- itr = entries.iterator();
- entry = itr.next();
- assertEquals((long) entry.getKey(), 2L);
- assertEquals((long) entry.getValue(), 22L);
- assertEquals(false, itr.hasNext());
- }
-
- @Test
- public void testMassiveDataOfBagState() {
- BagState<Integer> bagStateA = jstormStateInternals.state(
- StateNamespaces.global(), StateTags.bag("state-id-a", BigEndianIntegerCoder.of()));
-
- int count = 10000;
- int n = 1;
- while (n <= count) {
- bagStateA.add(n);
- n++;
- }
-
- int readCount = 0;
- int readN = 0;
- Iterator<Integer> itr = bagStateA.read().iterator();
- while (itr.hasNext()) {
- readN += itr.next();
- readCount++;
- }
-
- assertEquals((long) readN, ((1 + count) * count) / 2);
- assertEquals((long) readCount, count);
- }
-}