You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pe...@apache.org on 2017/08/20 15:03:15 UTC
[15/53] [abbrv] beam git commit: jstorm-runner: fix checkstyles.
jstorm-runner: fix checkstyles.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/aa251a4a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/aa251a4a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/aa251a4a
Branch: refs/heads/jstorm-runner
Commit: aa251a4a4d2850310f5dfd9db4d605cce41bba13
Parents: f3df3a2
Author: Pei He <pe...@apache.org>
Authored: Thu Jul 13 17:37:51 2017 +0800
Committer: Pei He <pe...@apache.org>
Committed: Sat Aug 19 12:02:47 2017 +0800
----------------------------------------------------------------------
.../beam/runners/jstorm/JStormRunner.java | 395 +++++------
.../runners/jstorm/JStormRunnerRegistrar.java | 39 +-
.../beam/runners/jstorm/JStormRunnerResult.java | 118 ++--
.../beam/runners/jstorm/TestJStormRunner.java | 188 +++---
.../serialization/ImmutableListSerializer.java | 152 +++--
.../serialization/ImmutableMapSerializer.java | 78 ++-
.../serialization/ImmutableSetSerializer.java | 93 +--
.../KvStoreIterableSerializer.java | 73 +-
.../SdkRepackImmuListSerializer.java | 116 ++--
.../SdkRepackImmuSetSerializer.java | 98 +--
.../UnmodifiableCollectionsSerializer.java | 290 ++++----
.../translation/StormPipelineTranslator.java | 273 ++++----
.../jstorm/translation/TranslationContext.java | 667 ++++++++++---------
.../jstorm/translation/TranslatorRegistry.java | 65 +-
.../translation/runtime/AbstractComponent.java | 66 +-
.../translation/runtime/AdaptorBasicBolt.java | 2 +-
.../translation/runtime/AdaptorBasicSpout.java | 2 +-
.../translation/runtime/DoFnExecutor.java | 511 +++++++-------
.../runtime/DoFnRunnerWithMetrics.java | 3 +-
.../jstorm/translation/runtime/Executor.java | 13 +-
.../translation/runtime/ExecutorContext.java | 15 +-
.../translation/runtime/ExecutorsBolt.java | 502 +++++++-------
.../translation/runtime/FlattenExecutor.java | 61 +-
.../runtime/GroupByWindowExecutor.java | 231 ++++---
.../runtime/MultiOutputDoFnExecutor.java | 79 ++-
.../runtime/MultiStatefulDoFnExecutor.java | 64 +-
.../runtime/StatefulDoFnExecutor.java | 63 +-
.../translation/runtime/TimerService.java | 37 +-
.../translation/runtime/TimerServiceImpl.java | 233 +++----
.../translation/runtime/TxExecutorsBolt.java | 193 +++---
.../runtime/TxUnboundedSourceSpout.java | 244 +++----
.../runtime/UnboundedSourceSpout.java | 288 ++++----
.../translation/runtime/ViewExecutor.java | 53 +-
.../runtime/WindowAssignExecutor.java | 130 ++--
.../runtime/state/JStormBagState.java | 261 ++++----
.../runtime/state/JStormCombiningState.java | 98 +--
.../runtime/state/JStormMapState.java | 227 ++++---
.../runtime/state/JStormStateInternals.java | 290 ++++----
.../runtime/state/JStormValueState.java | 92 ++-
.../runtime/state/JStormWatermarkHoldState.java | 88 +--
.../runtime/timer/JStormTimerInternals.java | 143 ++--
.../translator/BoundedSourceTranslator.java | 29 +-
.../translator/CombineGloballyTranslator.java | 5 +-
.../translator/CombinePerKeyTranslator.java | 5 +-
.../translator/FlattenTranslator.java | 34 +-
.../translator/GroupByKeyTranslator.java | 71 +-
.../translator/ParDoBoundMultiTranslator.java | 143 ++--
.../translator/ParDoBoundTranslator.java | 128 ++--
.../translator/ReshuffleTranslator.java | 4 +-
.../jstorm/translation/translator/Stream.java | 109 +--
.../translator/TransformTranslator.java | 74 +-
.../translator/UnboundedSourceTranslator.java | 28 +-
.../translation/translator/ViewTranslator.java | 586 ++++++++--------
.../translator/WindowAssignTranslator.java | 26 +-
.../translator/WindowBoundTranslator.java | 26 +-
.../jstorm/translation/util/CommonInstance.java | 6 +-
.../util/DefaultSideInputReader.java | 33 +-
.../translation/util/DefaultStepContext.java | 89 +--
.../beam/runners/jstorm/util/RunnerUtils.java | 46 +-
.../jstorm/util/SerializedPipelineOptions.java | 51 +-
.../jstorm/util/SingletonKeyedWorkItem.java | 3 +-
.../jstorm/JStormRunnerRegistrarTest.java | 4 +-
.../runtime/state/JStormStateInternalsTest.java | 345 +++++-----
63 files changed, 4314 insertions(+), 4165 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java
index 39c723b..5fdbe4d 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java
@@ -17,8 +17,6 @@
*/
package org.apache.beam.runners.jstorm;
-import static com.google.common.base.Preconditions.checkNotNull;
-
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
@@ -31,8 +29,6 @@ import backtype.storm.tuple.Fields;
import com.alibaba.jstorm.cache.KvStoreIterable;
import com.alibaba.jstorm.cluster.StormConfig;
import com.alibaba.jstorm.transactional.TransactionTopologyBuilder;
-import com.alibaba.jstorm.utils.JStormUtils;
-import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.beam.runners.jstorm.serialization.ImmutableListSerializer;
@@ -54,12 +50,9 @@ import org.apache.beam.runners.jstorm.translation.runtime.UnboundedSourceSpout;
import org.apache.beam.runners.jstorm.translation.translator.Stream;
import org.apache.beam.runners.jstorm.translation.util.CommonInstance;
import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.PipelineRunner;
-import org.apache.beam.sdk.metrics.MetricResults;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsValidator;
-import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -70,204 +63,218 @@ import org.slf4j.LoggerFactory;
public class JStormRunner extends PipelineRunner<JStormRunnerResult> {
- private static final Logger LOG = LoggerFactory.getLogger(JStormRunner.class);
-
- private JStormPipelineOptions options;
-
- public JStormRunner(JStormPipelineOptions options) {
- this.options = options;
+ private static final Logger LOG = LoggerFactory.getLogger(JStormRunner.class);
+
+ private JStormPipelineOptions options;
+
+ public JStormRunner(JStormPipelineOptions options) {
+ this.options = options;
+ }
+
+ public static JStormRunner fromOptions(PipelineOptions options) {
+ JStormPipelineOptions pipelineOptions = PipelineOptionsValidator.validate(
+ JStormPipelineOptions.class, options);
+ return new JStormRunner(pipelineOptions);
+ }
+
+ /**
+ * convert pipeline options to storm configuration format
+ *
+ * @param options
+ * @return
+ */
+ private Config convertPipelineOptionsToConfig(JStormPipelineOptions options) {
+ Config config = new Config();
+ if (options.getLocalMode())
+ config.put(Config.STORM_CLUSTER_MODE, "local");
+ else
+ config.put(Config.STORM_CLUSTER_MODE, "distributed");
+
+ Config.setNumWorkers(config, options.getWorkerNumber());
+
+ config.putAll(options.getTopologyConfig());
+
+ // Setup config for runtime env
+ config.put("worker.external", "beam");
+ config.put("topology.acker.executors", 0);
+
+ UnmodifiableCollectionsSerializer.registerSerializers(config);
+ // register classes of guava utils, ImmutableList, ImmutableSet, ImmutableMap
+ ImmutableListSerializer.registerSerializers(config);
+ SdkRepackImmuListSerializer.registerSerializers(config);
+ ImmutableSetSerializer.registerSerializers(config);
+ SdkRepackImmuSetSerializer.registerSerializers(config);
+ ImmutableMapSerializer.registerSerializers(config);
+
+ config.registerDefaultSerailizer(KvStoreIterable.class, KvStoreIterableSerializer.class);
+ return config;
+ }
+
+ @Override
+ public JStormRunnerResult run(Pipeline pipeline) {
+ LOG.info("Running pipeline...");
+ TranslationContext context = new TranslationContext(this.options);
+ StormPipelineTranslator transformer = new StormPipelineTranslator(context);
+ transformer.translate(pipeline);
+ LOG.info("UserGraphContext=\n{}", context.getUserGraphContext());
+ LOG.info("ExecutionGraphContext=\n{}", context.getExecutionGraphContext());
+
+ for (Stream stream : context.getExecutionGraphContext().getStreams()) {
+ LOG.info(
+ stream.getProducer().getComponentId() + " --> " + stream.getConsumer().getComponentId());
}
- public static JStormRunner fromOptions(PipelineOptions options) {
- JStormPipelineOptions pipelineOptions = PipelineOptionsValidator.validate(JStormPipelineOptions.class, options);
- return new JStormRunner(pipelineOptions);
+ String topologyName = options.getJobName();
+ Config config = convertPipelineOptionsToConfig(options);
+
+ return runTopology(
+ topologyName,
+ getTopology(options, context.getExecutionGraphContext()),
+ config);
+ }
+
+ private JStormRunnerResult runTopology(
+ String topologyName,
+ StormTopology topology,
+ Config config) {
+ try {
+ if (StormConfig.local_mode(config)) {
+ LocalCluster localCluster = LocalCluster.getInstance();
+ localCluster.submitTopology(topologyName, config, topology);
+ return JStormRunnerResult.local(
+ topologyName, config, localCluster, options.getLocalModeExecuteTime());
+ } else {
+ StormSubmitter.submitTopology(topologyName, config, topology);
+ return null;
+ }
+ } catch (Exception e) {
+ LOG.warn("Fail to submit topology", e);
+ throw new RuntimeException("Fail to submit topology", e);
}
-
- /**
- * convert pipeline options to storm configuration format
- * @param options
- * @return
- */
- private Config convertPipelineOptionsToConfig(JStormPipelineOptions options) {
- Config config = new Config();
- if (options.getLocalMode())
- config.put(Config.STORM_CLUSTER_MODE, "local");
- else
- config.put(Config.STORM_CLUSTER_MODE, "distributed");
-
- Config.setNumWorkers(config, options.getWorkerNumber());
-
- config.putAll(options.getTopologyConfig());
-
- // Setup config for runtime env
- config.put("worker.external", "beam");
- config.put("topology.acker.executors", 0);
-
- UnmodifiableCollectionsSerializer.registerSerializers(config);
- // register classes of guava utils, ImmutableList, ImmutableSet, ImmutableMap
- ImmutableListSerializer.registerSerializers(config);
- SdkRepackImmuListSerializer.registerSerializers(config);
- ImmutableSetSerializer.registerSerializers(config);
- SdkRepackImmuSetSerializer.registerSerializers(config);
- ImmutableMapSerializer.registerSerializers(config);
-
- config.registerDefaultSerailizer(KvStoreIterable.class, KvStoreIterableSerializer.class);
- return config;
+ }
+
+ private AbstractComponent getComponent(
+ String id, TranslationContext.ExecutionGraphContext context) {
+ AbstractComponent component = null;
+ AdaptorBasicSpout spout = context.getSpout(id);
+ if (spout != null) {
+ component = spout;
+ } else {
+ AdaptorBasicBolt bolt = context.getBolt(id);
+ if (bolt != null)
+ component = bolt;
}
- @Override
- public JStormRunnerResult run(Pipeline pipeline) {
- LOG.info("Running pipeline...");
- TranslationContext context = new TranslationContext(this.options);
- StormPipelineTranslator transformer = new StormPipelineTranslator(context);
- transformer.translate(pipeline);
- LOG.info("UserGraphContext=\n{}", context.getUserGraphContext());
- LOG.info("ExecutionGraphContext=\n{}", context.getExecutionGraphContext());
-
- for (Stream stream : context.getExecutionGraphContext().getStreams()) {
- LOG.info(stream.getProducer().getComponentId() + " --> " + stream.getConsumer().getComponentId());
- }
+ return component;
+ }
- String topologyName = options.getJobName();
- Config config = convertPipelineOptionsToConfig(options);
+ private StormTopology getTopology(
+ JStormPipelineOptions options, TranslationContext.ExecutionGraphContext context) {
+ boolean isExactlyOnce = options.getExactlyOnceTopology();
+ TopologyBuilder builder =
+ isExactlyOnce ? new TransactionTopologyBuilder() : new TopologyBuilder();
- return runTopology(
- topologyName,
- getTopology(options, context.getExecutionGraphContext()),
- config);
+ int parallelismNumber = options.getParallelismNumber();
+ Map<String, AdaptorBasicSpout> spouts = context.getSpouts();
+ for (String id : spouts.keySet()) {
+ IRichSpout spout = getSpout(isExactlyOnce, spouts.get(id));
+ builder.setSpout(id, spout, getParallelismNum(spouts.get(id), parallelismNumber));
}
- private JStormRunnerResult runTopology(String topologyName, StormTopology topology, Config config) {
- try {
- if (StormConfig.local_mode(config)) {
- LocalCluster localCluster = LocalCluster.getInstance();
- localCluster.submitTopology(topologyName, config, topology);
- return JStormRunnerResult.local(
- topologyName, config, localCluster, options.getLocalModeExecuteTime());
- } else {
- StormSubmitter.submitTopology(topologyName, config, topology);
- return null;
- }
- } catch (Exception e) {
- LOG.warn("Fail to submit topology", e);
- throw new RuntimeException("Fail to submit topology", e);
- }
+ HashMap<String, BoltDeclarer> declarers = new HashMap<>();
+ Iterable<Stream> streams = context.getStreams();
+ LOG.info("streams=" + streams);
+ for (Stream stream : streams) {
+ String destBoltId = stream.getConsumer().getComponentId();
+ IRichBolt bolt = getBolt(isExactlyOnce, context.getBolt(destBoltId));
+ BoltDeclarer declarer = declarers.get(destBoltId);
+ if (declarer == null) {
+ declarer = builder.setBolt(
+ destBoltId,
+ bolt,
+ getParallelismNum(context.getBolt(destBoltId), parallelismNumber));
+ declarers.put(destBoltId, declarer);
+ }
+
+ Stream.Grouping grouping = stream.getConsumer().getGrouping();
+ String streamId = stream.getProducer().getStreamId();
+ String srcBoltId = stream.getProducer().getComponentId();
+
+ // add stream output declare for "from" component
+ AbstractComponent component = getComponent(srcBoltId, context);
+ if (grouping.getType().equals(Stream.Grouping.Type.FIELDS))
+ component.addKVOutputField(streamId);
+ else
+ component.addOutputField(streamId);
+
+ // "to" component declares grouping to "from" component
+ switch (grouping.getType()) {
+ case SHUFFLE:
+ declarer.shuffleGrouping(srcBoltId, streamId);
+ break;
+ case FIELDS:
+ declarer.fieldsGrouping(srcBoltId, streamId, new Fields(grouping.getFields()));
+ break;
+ case ALL:
+ declarer.allGrouping(srcBoltId, streamId);
+ break;
+ case DIRECT:
+ declarer.directGrouping(srcBoltId, streamId);
+ break;
+ case GLOBAL:
+ declarer.globalGrouping(srcBoltId, streamId);
+ break;
+ case LOCAL_OR_SHUFFLE:
+ declarer.localOrShuffleGrouping(srcBoltId, streamId);
+ break;
+ case NONE:
+ declarer.noneGrouping(srcBoltId, streamId);
+ break;
+ default:
+ throw new UnsupportedOperationException("unsupported grouping type: " + grouping);
+ }
+
+ // Subscribe grouping of water mark stream
+ component.addOutputField(CommonInstance.BEAM_WATERMARK_STREAM_ID);
+ declarer.allGrouping(srcBoltId, CommonInstance.BEAM_WATERMARK_STREAM_ID);
}
- private AbstractComponent getComponent(String id, TranslationContext.ExecutionGraphContext context) {
- AbstractComponent component = null;
- AdaptorBasicSpout spout = context.getSpout(id);
- if (spout != null) {
- component = spout;
- } else {
- AdaptorBasicBolt bolt = context.getBolt(id);
- if (bolt != null)
- component = bolt;
- }
-
- return component;
+ if (isExactlyOnce) {
+ ((TransactionTopologyBuilder) builder).enableHdfs();
}
-
- private StormTopology getTopology(JStormPipelineOptions options, TranslationContext.ExecutionGraphContext context) {
- boolean isExactlyOnce = options.getExactlyOnceTopology();
- TopologyBuilder builder = isExactlyOnce ? new TransactionTopologyBuilder() : new TopologyBuilder();
-
- int parallelismNumber = options.getParallelismNumber();
- Map<String, AdaptorBasicSpout> spouts = context.getSpouts();
- for (String id : spouts.keySet()) {
- IRichSpout spout = getSpout(isExactlyOnce, spouts.get(id));
- builder.setSpout(id, spout, getParallelismNum(spouts.get(id), parallelismNumber));
- }
-
- HashMap<String, BoltDeclarer> declarers = new HashMap<>();
- Iterable<Stream> streams = context.getStreams();
- LOG.info("streams=" + streams);
- for (Stream stream : streams) {
- String destBoltId = stream.getConsumer().getComponentId();
- IRichBolt bolt = getBolt(isExactlyOnce, context.getBolt(destBoltId));
- BoltDeclarer declarer = declarers.get(destBoltId);
- if (declarer == null) {
- declarer = builder.setBolt(destBoltId, bolt,
- getParallelismNum(context.getBolt(destBoltId), parallelismNumber));
- declarers.put(destBoltId, declarer);
- }
-
- Stream.Grouping grouping = stream.getConsumer().getGrouping();
- String streamId = stream.getProducer().getStreamId();
- String srcBoltId = stream.getProducer().getComponentId();
-
- // add stream output declare for "from" component
- AbstractComponent component = getComponent(srcBoltId, context);
- if (grouping.getType().equals(Stream.Grouping.Type.FIELDS))
- component.addKVOutputField(streamId);
- else
- component.addOutputField(streamId);
-
- // "to" component declares grouping to "from" component
- switch (grouping.getType()) {
- case SHUFFLE:
- declarer.shuffleGrouping(srcBoltId, streamId);
- break;
- case FIELDS:
- declarer.fieldsGrouping(srcBoltId, streamId, new Fields(grouping.getFields()));
- break;
- case ALL:
- declarer.allGrouping(srcBoltId, streamId);
- break;
- case DIRECT:
- declarer.directGrouping(srcBoltId, streamId);
- break;
- case GLOBAL:
- declarer.globalGrouping(srcBoltId, streamId);
- break;
- case LOCAL_OR_SHUFFLE:
- declarer.localOrShuffleGrouping(srcBoltId, streamId);
- break;
- case NONE:
- declarer.noneGrouping(srcBoltId, streamId);
- break;
- default:
- throw new UnsupportedOperationException("unsupported grouping type: " + grouping);
- }
-
- // Subscribe grouping of water mark stream
- component.addOutputField(CommonInstance.BEAM_WATERMARK_STREAM_ID);
- declarer.allGrouping(srcBoltId, CommonInstance.BEAM_WATERMARK_STREAM_ID);
- }
-
- if (isExactlyOnce) {
- ((TransactionTopologyBuilder) builder).enableHdfs();
- }
- return builder.createTopology();
- }
-
- private IRichSpout getSpout(boolean isExactlyOnce, IRichSpout spout) {
- IRichSpout ret = null;
- if (isExactlyOnce) {
- if (spout instanceof UnboundedSourceSpout) {
- ret = new TxUnboundedSourceSpout((UnboundedSourceSpout) spout);
- } else {
- String error = String.format("The specified type(%s) is not supported in exactly once mode yet!", spout.getClass().toString());
- throw new RuntimeException(error);
- }
- } else {
- ret = spout;
- }
- return ret;
- }
-
- private IRichBolt getBolt(boolean isExactlyOnce, ExecutorsBolt bolt) {
- return isExactlyOnce ? new TxExecutorsBolt(bolt) : bolt;
- }
-
- /**
- * Calculate the final parallelism number according to the configured number and global number.
- * @param component
- * @param globalParallelismNum
- * @return final parallelism number for the specified component
- */
- private int getParallelismNum(AbstractComponent component, int globalParallelismNum) {
- int configParallelismNum = component.getParallelismNum();
- return configParallelismNum > 0 ? configParallelismNum : globalParallelismNum;
+ return builder.createTopology();
+ }
+
+ private IRichSpout getSpout(boolean isExactlyOnce, IRichSpout spout) {
+ IRichSpout ret = null;
+ if (isExactlyOnce) {
+ if (spout instanceof UnboundedSourceSpout) {
+ ret = new TxUnboundedSourceSpout((UnboundedSourceSpout) spout);
+ } else {
+ String error = String.format(
+ "The specified type(%s) is not supported in exactly once mode yet!",
+ spout.getClass().toString());
+ throw new RuntimeException(error);
+ }
+ } else {
+ ret = spout;
}
+ return ret;
+ }
+
+ private IRichBolt getBolt(boolean isExactlyOnce, ExecutorsBolt bolt) {
+ return isExactlyOnce ? new TxExecutorsBolt(bolt) : bolt;
+ }
+
+ /**
+ * Calculate the final parallelism number according to the configured number and global number.
+ *
+ * @param component
+ * @param globalParallelismNum
+ * @return final parallelism number for the specified component
+ */
+ private int getParallelismNum(AbstractComponent component, int globalParallelismNum) {
+ int configParallelismNum = component.getParallelismNum();
+ return configParallelismNum > 0 ? configParallelismNum : globalParallelismNum;
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerRegistrar.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerRegistrar.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerRegistrar.java
index 465236b..1b4d283 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerRegistrar.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerRegistrar.java
@@ -29,27 +29,28 @@ import org.apache.beam.sdk.runners.PipelineRunnerRegistrar;
* {@link JStormRunner}.
*/
public class JStormRunnerRegistrar {
- private JStormRunnerRegistrar() {}
+ private JStormRunnerRegistrar() {
+ }
- /**
- * Register the {@link JStormPipelineOptions}.
- */
- @AutoService(PipelineOptionsRegistrar.class)
- public static class Options implements PipelineOptionsRegistrar {
- @Override
- public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
- return ImmutableList.<Class<? extends PipelineOptions>> of(JStormPipelineOptions.class);
- }
+ /**
+ * Register the {@link JStormPipelineOptions}.
+ */
+ @AutoService(PipelineOptionsRegistrar.class)
+ public static class Options implements PipelineOptionsRegistrar {
+ @Override
+ public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
+ return ImmutableList.<Class<? extends PipelineOptions>>of(JStormPipelineOptions.class);
}
+ }
- /**
- * Register the {@link JStormRunner}.
- */
- @AutoService(PipelineRunnerRegistrar.class)
- public static class Runner implements PipelineRunnerRegistrar {
- @Override
- public Iterable<Class<? extends PipelineRunner<?>>> getPipelineRunners() {
- return ImmutableList.<Class<? extends PipelineRunner<?>>> of(JStormRunner.class);
- }
+ /**
+ * Register the {@link JStormRunner}.
+ */
+ @AutoService(PipelineRunnerRegistrar.class)
+ public static class Runner implements PipelineRunnerRegistrar {
+ @Override
+ public Iterable<Class<? extends PipelineRunner<?>>> getPipelineRunners() {
+ return ImmutableList.<Class<? extends PipelineRunner<?>>>of(JStormRunner.class);
}
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerResult.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerResult.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerResult.java
index e15ee6d..797c899 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerResult.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerResult.java
@@ -33,76 +33,76 @@ import org.joda.time.Duration;
*/
public abstract class JStormRunnerResult implements PipelineResult {
- public static JStormRunnerResult local(
+ public static JStormRunnerResult local(
+ String topologyName,
+ Config config,
+ LocalCluster localCluster,
+ long localModeExecuteTimeSecs) {
+ return new LocalStormPipelineResult(
+ topologyName, config, localCluster, localModeExecuteTimeSecs);
+ }
+
+ private final String topologyName;
+ private final Config config;
+
+ JStormRunnerResult(String topologyName, Config config) {
+ this.config = checkNotNull(config, "config");
+ this.topologyName = checkNotNull(topologyName, "topologyName");
+ }
+
+ public State getState() {
+ return null;
+ }
+
+ public Config getConfig() {
+ return config;
+ }
+
+ public String getTopologyName() {
+ return topologyName;
+ }
+
+ private static class LocalStormPipelineResult extends JStormRunnerResult {
+
+ private LocalCluster localCluster;
+ private long localModeExecuteTimeSecs;
+
+ LocalStormPipelineResult(
String topologyName,
Config config,
LocalCluster localCluster,
long localModeExecuteTimeSecs) {
- return new LocalStormPipelineResult(
- topologyName, config, localCluster, localModeExecuteTimeSecs);
- }
-
- private final String topologyName;
- private final Config config;
-
- JStormRunnerResult(String topologyName, Config config) {
- this.config = checkNotNull(config, "config");
- this.topologyName = checkNotNull(topologyName, "topologyName");
+ super(topologyName, config);
+ this.localCluster = checkNotNull(localCluster, "localCluster");
}
- public State getState() {
- return null;
+ @Override
+ public State cancel() throws IOException {
+ //localCluster.deactivate(getTopologyName());
+ localCluster.killTopology(getTopologyName());
+ localCluster.shutdown();
+ JStormUtils.sleepMs(1000);
+ return State.CANCELLED;
}
- public Config getConfig() {
- return config;
+ @Override
+ public State waitUntilFinish(Duration duration) {
+ return waitUntilFinish();
}
- public String getTopologyName() {
- return topologyName;
+ @Override
+ public State waitUntilFinish() {
+ JStormUtils.sleepMs(localModeExecuteTimeSecs * 1000);
+ try {
+ return cancel();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
}
- private static class LocalStormPipelineResult extends JStormRunnerResult {
-
- private LocalCluster localCluster;
- private long localModeExecuteTimeSecs;
-
- LocalStormPipelineResult(
- String topologyName,
- Config config,
- LocalCluster localCluster,
- long localModeExecuteTimeSecs) {
- super(topologyName, config);
- this.localCluster = checkNotNull(localCluster, "localCluster");
- }
-
- @Override
- public State cancel() throws IOException {
- //localCluster.deactivate(getTopologyName());
- localCluster.killTopology(getTopologyName());
- localCluster.shutdown();
- JStormUtils.sleepMs(1000);
- return State.CANCELLED;
- }
-
- @Override
- public State waitUntilFinish(Duration duration) {
- return waitUntilFinish();
- }
-
- @Override
- public State waitUntilFinish() {
- JStormUtils.sleepMs(localModeExecuteTimeSecs * 1000);
- try {
- return cancel();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public MetricResults metrics() {
- return null;
- }
+ @Override
+ public MetricResults metrics() {
+ return null;
}
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java
index b7ff4eb..e27efc0 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java
@@ -1,10 +1,19 @@
package org.apache.beam.runners.jstorm;
+import static com.google.common.base.Preconditions.checkNotNull;
+
import avro.shaded.com.google.common.collect.Maps;
import com.alibaba.jstorm.common.metric.AsmMetric;
-import com.alibaba.jstorm.metric.*;
+import com.alibaba.jstorm.metric.AsmMetricRegistry;
+import com.alibaba.jstorm.metric.AsmWindow;
+import com.alibaba.jstorm.metric.JStormMetrics;
+import com.alibaba.jstorm.metric.MetaType;
+import com.alibaba.jstorm.metric.MetricType;
import com.alibaba.jstorm.utils.JStormUtils;
import com.google.common.base.Optional;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineRunner;
import org.apache.beam.sdk.options.PipelineOptions;
@@ -12,109 +21,106 @@ import org.apache.beam.sdk.testing.PAssert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.Map;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
/**
* Test JStorm runner.
*/
public class TestJStormRunner extends PipelineRunner<JStormRunnerResult> {
- private static final Logger LOG = LoggerFactory.getLogger(TestJStormRunner.class);
-
- public static TestJStormRunner fromOptions(PipelineOptions options) {
- return new TestJStormRunner(options.as(JStormPipelineOptions.class));
+ private static final Logger LOG = LoggerFactory.getLogger(TestJStormRunner.class);
+
+ public static TestJStormRunner fromOptions(PipelineOptions options) {
+ return new TestJStormRunner(options.as(JStormPipelineOptions.class));
+ }
+
+ private final JStormRunner stormRunner;
+ private final JStormPipelineOptions options;
+
+ private TestJStormRunner(JStormPipelineOptions options) {
+ this.options = options;
+ Map conf = Maps.newHashMap();
+ //conf.put(ConfigExtension.KV_STORE_TYPE, KvStoreManagerFactory.KvStoreType.memory.toString());
+ options.setTopologyConfig(conf);
+ options.setLocalMode(true);
+ stormRunner = JStormRunner.fromOptions(checkNotNull(options, "options"));
+ }
+
+ @Override
+ public JStormRunnerResult run(Pipeline pipeline) {
+ JStormRunnerResult result = stormRunner.run(pipeline);
+
+ try {
+ int numberOfAssertions = PAssert.countAsserts(pipeline);
+
+ LOG.info("Running JStorm job {} with {} expected assertions.",
+ result.getTopologyName(), numberOfAssertions);
+ if (numberOfAssertions == 0) {
+ // If assert number is zero, wait 5 sec
+ JStormUtils.sleepMs(5000);
+ return result;
+ } else {
+ for (int i = 0; i < 40; ++i) {
+ Optional<Boolean> success = checkForPAssertSuccess(numberOfAssertions);
+ if (success.isPresent() && success.get()) {
+ return result;
+ } else if (success.isPresent() && !success.get()) {
+ throw new AssertionError("Failed assertion checks.");
+ } else {
+ JStormUtils.sleepMs(500);
+ }
+ }
+ LOG.info("Assertion checks timed out.");
+ throw new AssertionError("Assertion checks timed out.");
+ }
+ } finally {
+ clearPAssertCount();
+ cancel(result);
}
+ }
- private final JStormRunner stormRunner;
- private final JStormPipelineOptions options;
-
- private TestJStormRunner(JStormPipelineOptions options) {
- this.options = options;
- Map conf = Maps.newHashMap();
- //conf.put(ConfigExtension.KV_STORE_TYPE, KvStoreManagerFactory.KvStoreType.memory.toString());
- options.setTopologyConfig(conf);
- options.setLocalMode(true);
- stormRunner = JStormRunner.fromOptions(checkNotNull(options, "options"));
+ private Optional<Boolean> checkForPAssertSuccess(int expectedNumberOfAssertions) {
+ int successes = 0;
+ for (AsmMetric metric :
+ JStormMetrics.search(PAssert.SUCCESS_COUNTER, MetaType.TASK, MetricType.COUNTER)) {
+ successes += ((Long) metric.getValue(AsmWindow.M1_WINDOW)).intValue();
}
-
- @Override
- public JStormRunnerResult run(Pipeline pipeline) {
- JStormRunnerResult result = stormRunner.run(pipeline);
-
- try {
- int numberOfAssertions = PAssert.countAsserts(pipeline);
-
- LOG.info("Running JStorm job {} with {} expected assertions.", result.getTopologyName(), numberOfAssertions);
- if(numberOfAssertions == 0) {
- // If assert number is zero, wait 5 sec
- JStormUtils.sleepMs(5000);
- return result;
- } else {
- for (int i = 0; i < 40; ++i) {
- Optional<Boolean> success = checkForPAssertSuccess(numberOfAssertions);
- if (success.isPresent() && success.get()) {
- return result;
- } else if (success.isPresent() && !success.get()) {
- throw new AssertionError("Failed assertion checks.");
- } else {
- JStormUtils.sleepMs(500);
- }
- }
- LOG.info("Assertion checks timed out.");
- throw new AssertionError("Assertion checks timed out.");
- }
- } finally {
- clearPAssertCount();
- cancel(result);
- }
+ int failures = 0;
+ for (AsmMetric metric :
+ JStormMetrics.search(PAssert.FAILURE_COUNTER, MetaType.TASK, MetricType.COUNTER)) {
+ failures += ((Long) metric.getValue(AsmWindow.M1_WINDOW)).intValue();
}
- private Optional<Boolean> checkForPAssertSuccess(int expectedNumberOfAssertions) {
- int successes = 0;
- for (AsmMetric metric : JStormMetrics.search(PAssert.SUCCESS_COUNTER, MetaType.TASK, MetricType.COUNTER)) {
- successes += ((Long) metric.getValue(AsmWindow.M1_WINDOW)).intValue();
- }
- int failures = 0;
- for (AsmMetric metric : JStormMetrics.search(PAssert.FAILURE_COUNTER, MetaType.TASK, MetricType.COUNTER)) {
- failures += ((Long) metric.getValue(AsmWindow.M1_WINDOW)).intValue();
- }
-
- if (failures > 0) {
- LOG.info("Found {} success, {} failures out of {} expected assertions.",
- successes, failures, expectedNumberOfAssertions);
- return Optional.of(false);
- } else if (successes >= expectedNumberOfAssertions) {
- LOG.info("Found {} success, {} failures out of {} expected assertions.",
- successes, failures, expectedNumberOfAssertions);
- return Optional.of(true);
- }
-
- LOG.info("Found {} success, {} failures out of {} expected assertions.",
- successes, failures, expectedNumberOfAssertions);
- return Optional.absent();
+ if (failures > 0) {
+ LOG.info("Found {} success, {} failures out of {} expected assertions.",
+ successes, failures, expectedNumberOfAssertions);
+ return Optional.of(false);
+ } else if (successes >= expectedNumberOfAssertions) {
+ LOG.info("Found {} success, {} failures out of {} expected assertions.",
+ successes, failures, expectedNumberOfAssertions);
+ return Optional.of(true);
}
- private void clearPAssertCount() {
- String topologyName = options.getJobName();
- AsmMetricRegistry taskMetrics = JStormMetrics.getTaskMetrics();
- Iterator<Map.Entry<String, AsmMetric>> itr = taskMetrics.getMetrics().entrySet().iterator();
- while (itr.hasNext()) {
- Map.Entry<String, AsmMetric> metric = itr.next();
- if (metric.getKey().contains(topologyName)) {
- itr.remove();
- }
- }
+ LOG.info("Found {} success, {} failures out of {} expected assertions.",
+ successes, failures, expectedNumberOfAssertions);
+ return Optional.absent();
+ }
+
+ private void clearPAssertCount() {
+ String topologyName = options.getJobName();
+ AsmMetricRegistry taskMetrics = JStormMetrics.getTaskMetrics();
+ Iterator<Map.Entry<String, AsmMetric>> itr = taskMetrics.getMetrics().entrySet().iterator();
+ while (itr.hasNext()) {
+ Map.Entry<String, AsmMetric> metric = itr.next();
+ if (metric.getKey().contains(topologyName)) {
+ itr.remove();
+ }
}
+ }
- private void cancel(JStormRunnerResult result) {
- try {
- result.cancel();
- } catch (IOException e) {
- throw new RuntimeException("Failed to cancel.", e);
-}
+ private void cancel(JStormRunnerResult result) {
+ try {
+ result.cancel();
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to cancel.", e);
}
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableListSerializer.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableListSerializer.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableListSerializer.java
index aa7d325..fa4eeb6 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableListSerializer.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableListSerializer.java
@@ -1,92 +1,108 @@
package org.apache.beam.runners.jstorm.serialization;
import backtype.storm.Config;
-import org.apache.beam.runners.jstorm.util.RunnerUtils;
import com.alibaba.jstorm.esotericsoftware.kryo.Kryo;
import com.alibaba.jstorm.esotericsoftware.kryo.Serializer;
import com.alibaba.jstorm.esotericsoftware.kryo.io.Input;
import com.alibaba.jstorm.esotericsoftware.kryo.io.Output;
-import com.google.common.collect.*;
+import com.google.common.collect.HashBasedTable;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableTable;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Table;
+import org.apache.beam.runners.jstorm.util.RunnerUtils;
public class ImmutableListSerializer extends Serializer<ImmutableList<Object>> {
- private static final boolean DOES_NOT_ACCEPT_NULL = false;
- private static final boolean IMMUTABLE = true;
+ private static final boolean DOES_NOT_ACCEPT_NULL = false;
+ private static final boolean IMMUTABLE = true;
- public ImmutableListSerializer() {
- super(DOES_NOT_ACCEPT_NULL, IMMUTABLE);
- }
+ public ImmutableListSerializer() {
+ super(DOES_NOT_ACCEPT_NULL, IMMUTABLE);
+ }
- @Override
- public void write(Kryo kryo, Output output, ImmutableList<Object> object) {
- output.writeInt(object.size(), true);
- for (Object elm : object) {
- kryo.writeClassAndObject(output, elm);
- }
+ @Override
+ public void write(Kryo kryo, Output output, ImmutableList<Object> object) {
+ output.writeInt(object.size(), true);
+ for (Object elm : object) {
+ kryo.writeClassAndObject(output, elm);
}
+ }
- @Override
- public ImmutableList<Object> read(Kryo kryo, Input input, Class<ImmutableList<Object>> type) {
- final int size = input.readInt(true);
- final Object[] list = new Object[size];
- for (int i = 0; i < size; ++i) {
- list[i] = kryo.readClassAndObject(input);
- }
- return ImmutableList.copyOf(list);
+ @Override
+ public ImmutableList<Object> read(Kryo kryo, Input input, Class<ImmutableList<Object>> type) {
+ final int size = input.readInt(true);
+ final Object[] list = new Object[size];
+ for (int i = 0; i < size; ++i) {
+ list[i] = kryo.readClassAndObject(input);
}
+ return ImmutableList.copyOf(list);
+ }
- /**
- * Creates a new {@link ImmutableListSerializer} and registers its serializer
- * for the several ImmutableList related classes.
- */
- public static void registerSerializers(Config config) {
+ /**
+ * Creates a new {@link ImmutableListSerializer} and registers its serializer
+ * for the several ImmutableList related classes.
+ */
+ public static void registerSerializers(Config config) {
- // ImmutableList (abstract class)
- // +- RegularImmutableList
- // | RegularImmutableList
- // +- SingletonImmutableList
- // | Optimized for List with only 1 element.
- // +- SubList
- // | Representation for part of ImmutableList
- // +- ReverseImmutableList
- // | For iterating in reverse order
- // +- StringAsImmutableList
- // | Used by Lists#charactersOf
- // +- Values (ImmutableTable values)
- // Used by return value of #values() when there are multiple cells
+ // ImmutableList (abstract class)
+ // +- RegularImmutableList
+ // | RegularImmutableList
+ // +- SingletonImmutableList
+ // | Optimized for List with only 1 element.
+ // +- SubList
+ // | Representation for part of ImmutableList
+ // +- ReverseImmutableList
+ // | For iterating in reverse order
+ // +- StringAsImmutableList
+ // | Used by Lists#charactersOf
+ // +- Values (ImmutableTable values)
+ // Used by return value of #values() when there are multiple cells
- config.registerSerialization(ImmutableList.class, ImmutableListSerializer.class);
- config.registerSerialization(
- RunnerUtils.getBeamSdkRepackClass(ImmutableList.class), ImmutableListSerializer.class);
+ config.registerSerialization(ImmutableList.class, ImmutableListSerializer.class);
+ config.registerSerialization(
+ RunnerUtils.getBeamSdkRepackClass(ImmutableList.class), ImmutableListSerializer.class);
- // Note:
- // Only registering above is good enough for serializing/deserializing.
- // but if using Kryo#copy, following is required.
+ // Note:
+ // Only registering above is good enough for serializing/deserializing.
+ // but if using Kryo#copy, following is required.
- config.registerSerialization(ImmutableList.of().getClass(), ImmutableListSerializer.class);
- config.registerSerialization(
- RunnerUtils.getBeamSdkRepackClass(ImmutableList.of().getClass()), ImmutableListSerializer.class);
- config.registerSerialization(ImmutableList.of(1).getClass(), ImmutableListSerializer.class);
- config.registerSerialization(
- RunnerUtils.getBeamSdkRepackClass(ImmutableList.of(1).getClass()), ImmutableListSerializer.class);
- config.registerSerialization(ImmutableList.of(1,2,3).subList(1, 2).getClass(), ImmutableListSerializer.class);
- config.registerSerialization(
- RunnerUtils.getBeamSdkRepackClass(ImmutableList.of(1,2,3).subList(1, 2).getClass()), ImmutableListSerializer.class);
- config.registerSerialization(ImmutableList.of().reverse().getClass(), ImmutableListSerializer.class);
- config.registerSerialization(
- RunnerUtils.getBeamSdkRepackClass(ImmutableList.of().reverse().getClass()), ImmutableListSerializer.class);
+ config.registerSerialization(ImmutableList.of().getClass(), ImmutableListSerializer.class);
+ config.registerSerialization(
+ RunnerUtils.getBeamSdkRepackClass(ImmutableList.of().getClass()),
+ ImmutableListSerializer.class);
+ config.registerSerialization(ImmutableList.of(1).getClass(), ImmutableListSerializer.class);
+ config.registerSerialization(
+ RunnerUtils.getBeamSdkRepackClass(ImmutableList.of(1).getClass()),
+ ImmutableListSerializer.class);
+ config.registerSerialization(
+ ImmutableList.of(1, 2, 3).subList(1, 2).getClass(),
+ ImmutableListSerializer.class);
+ config.registerSerialization(
+ RunnerUtils.getBeamSdkRepackClass(ImmutableList.of(1, 2, 3).subList(1, 2).getClass()),
+ ImmutableListSerializer.class);
+ config.registerSerialization(
+ ImmutableList.of().reverse().getClass(),
+ ImmutableListSerializer.class);
+ config.registerSerialization(
+ RunnerUtils.getBeamSdkRepackClass(ImmutableList.of().reverse().getClass()),
+ ImmutableListSerializer.class);
- config.registerSerialization(Lists.charactersOf("KryoRocks").getClass(), ImmutableListSerializer.class);
- config.registerSerialization(
- RunnerUtils.getBeamSdkRepackClass(Lists.charactersOf("KryoRocks").getClass()), ImmutableListSerializer.class);
+ config.registerSerialization(
+ Lists.charactersOf("KryoRocks").getClass(),
+ ImmutableListSerializer.class);
+ config.registerSerialization(
+ RunnerUtils.getBeamSdkRepackClass(Lists.charactersOf("KryoRocks").getClass()),
+ ImmutableListSerializer.class);
- Table<Integer,Integer,Integer> baseTable = HashBasedTable.create();
- baseTable.put(1, 2, 3);
- baseTable.put(4, 5, 6);
- Table<Integer, Integer, Integer> table = ImmutableTable.copyOf(baseTable);
- config.registerSerialization(table.values().getClass(), ImmutableListSerializer.class);
- config.registerSerialization(
- RunnerUtils.getBeamSdkRepackClass(table.values().getClass()), ImmutableListSerializer.class);
+ Table<Integer, Integer, Integer> baseTable = HashBasedTable.create();
+ baseTable.put(1, 2, 3);
+ baseTable.put(4, 5, 6);
+ Table<Integer, Integer, Integer> table = ImmutableTable.copyOf(baseTable);
+ config.registerSerialization(table.values().getClass(), ImmutableListSerializer.class);
+ config.registerSerialization(
+ RunnerUtils.getBeamSdkRepackClass(table.values().getClass()),
+ ImmutableListSerializer.class);
- }
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableMapSerializer.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableMapSerializer.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableMapSerializer.java
index ee8b765..77eede3 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableMapSerializer.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableMapSerializer.java
@@ -7,55 +7,61 @@ import com.alibaba.jstorm.esotericsoftware.kryo.io.Input;
import com.alibaba.jstorm.esotericsoftware.kryo.io.Output;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
-
import java.util.EnumMap;
import java.util.HashMap;
import java.util.Map;
public class ImmutableMapSerializer extends Serializer<ImmutableMap<Object, ? extends Object>> {
- private static final boolean DOES_NOT_ACCEPT_NULL = true;
- private static final boolean IMMUTABLE = true;
+ private static final boolean DOES_NOT_ACCEPT_NULL = true;
+ private static final boolean IMMUTABLE = true;
- public ImmutableMapSerializer() {
- super(DOES_NOT_ACCEPT_NULL, IMMUTABLE);
- }
+ public ImmutableMapSerializer() {
+ super(DOES_NOT_ACCEPT_NULL, IMMUTABLE);
+ }
- @Override
- public void write(Kryo kryo, Output output, ImmutableMap<Object, ? extends Object> immutableMap) {
- kryo.writeObject(output, Maps.newHashMap(immutableMap));
- }
+ @Override
+ public void write(Kryo kryo, Output output, ImmutableMap<Object, ? extends Object> immutableMap) {
+ kryo.writeObject(output, Maps.newHashMap(immutableMap));
+ }
- @Override
- public ImmutableMap<Object, Object> read(Kryo kryo, Input input, Class<ImmutableMap<Object, ? extends Object>> type) {
- Map map = kryo.readObject(input, HashMap.class);
- return ImmutableMap.copyOf(map);
- }
+ @Override
+ public ImmutableMap<Object, Object> read(
+ Kryo kryo,
+ Input input,
+ Class<ImmutableMap<Object, ? extends Object>> type) {
+ Map map = kryo.readObject(input, HashMap.class);
+ return ImmutableMap.copyOf(map);
+ }
- /**
- * Creates a new {@link ImmutableMapSerializer} and registers its serializer
- * for the several ImmutableMap related classes.
- */
- public static void registerSerializers(Config config) {
+ /**
+ * Creates a new {@link ImmutableMapSerializer} and registers its serializer
+ * for the several ImmutableMap related classes.
+ */
+ public static void registerSerializers(Config config) {
- config.registerSerialization(ImmutableMap.class, ImmutableMapSerializer.class);
- config.registerSerialization(ImmutableMap.of().getClass(), ImmutableMapSerializer.class);
+ config.registerSerialization(ImmutableMap.class, ImmutableMapSerializer.class);
+ config.registerSerialization(ImmutableMap.of().getClass(), ImmutableMapSerializer.class);
- Object o1 = new Object();
- Object o2 = new Object();
+ Object o1 = new Object();
+ Object o2 = new Object();
- config.registerSerialization(ImmutableMap.of(o1, o1).getClass(), ImmutableMapSerializer.class);
- config.registerSerialization(ImmutableMap.of(o1, o1, o2, o2).getClass(), ImmutableMapSerializer.class);
- Map<DummyEnum,Object> enumMap = new EnumMap<DummyEnum, Object>(DummyEnum.class);
- for (DummyEnum e : DummyEnum.values()) {
- enumMap.put(e, o1);
- }
-
- config.registerSerialization(ImmutableMap.copyOf(enumMap).getClass(), ImmutableMapSerializer.class);
+ config.registerSerialization(ImmutableMap.of(o1, o1).getClass(), ImmutableMapSerializer.class);
+ config.registerSerialization(
+ ImmutableMap.of(o1, o1, o2, o2).getClass(),
+ ImmutableMapSerializer.class);
+ Map<DummyEnum, Object> enumMap = new EnumMap<DummyEnum, Object>(DummyEnum.class);
+ for (DummyEnum e : DummyEnum.values()) {
+ enumMap.put(e, o1);
}
- private enum DummyEnum {
- VALUE1,
- VALUE2
- }
+ config.registerSerialization(
+ ImmutableMap.copyOf(enumMap).getClass(),
+ ImmutableMapSerializer.class);
+ }
+
+ private enum DummyEnum {
+ VALUE1,
+ VALUE2
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableSetSerializer.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableSetSerializer.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableSetSerializer.java
index cdc4382..3a43b2b 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableSetSerializer.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableSetSerializer.java
@@ -10,62 +10,63 @@ import com.google.common.collect.Sets;
public class ImmutableSetSerializer extends Serializer<ImmutableSet<Object>> {
- private static final boolean DOES_NOT_ACCEPT_NULL = false;
- private static final boolean IMMUTABLE = true;
+ private static final boolean DOES_NOT_ACCEPT_NULL = false;
+ private static final boolean IMMUTABLE = true;
- public ImmutableSetSerializer() {
- super(DOES_NOT_ACCEPT_NULL, IMMUTABLE);
- }
+ public ImmutableSetSerializer() {
+ super(DOES_NOT_ACCEPT_NULL, IMMUTABLE);
+ }
- @Override
- public void write(Kryo kryo, Output output, ImmutableSet<Object> object) {
- output.writeInt(object.size(), true);
- for (Object elm : object) {
- kryo.writeClassAndObject(output, elm);
- }
+ @Override
+ public void write(Kryo kryo, Output output, ImmutableSet<Object> object) {
+ output.writeInt(object.size(), true);
+ for (Object elm : object) {
+ kryo.writeClassAndObject(output, elm);
}
+ }
- @Override
- public ImmutableSet<Object> read(Kryo kryo, Input input, Class<ImmutableSet<Object>> type) {
- final int size = input.readInt(true);
- ImmutableSet.Builder<Object> builder = ImmutableSet.builder();
- for (int i = 0; i < size; ++i) {
- builder.add(kryo.readClassAndObject(input));
- }
- return builder.build();
+ @Override
+ public ImmutableSet<Object> read(Kryo kryo, Input input, Class<ImmutableSet<Object>> type) {
+ final int size = input.readInt(true);
+ ImmutableSet.Builder<Object> builder = ImmutableSet.builder();
+ for (int i = 0; i < size; ++i) {
+ builder.add(kryo.readClassAndObject(input));
}
+ return builder.build();
+ }
- /**
- * Creates a new {@link ImmutableSetSerializer} and registers its serializer
- * for the several ImmutableSet related classes.
- */
- public static void registerSerializers(Config config) {
+ /**
+ * Creates a new {@link ImmutableSetSerializer} and registers its serializer
+ * for the several ImmutableSet related classes.
+ */
+ public static void registerSerializers(Config config) {
- // ImmutableList (abstract class)
- // +- EmptyImmutableSet
- // | EmptyImmutableSet
- // +- SingletonImmutableSet
- // | Optimized for Set with only 1 element.
- // +- RegularImmutableSet
- // | RegularImmutableList
- // +- EnumImmutableSet
- // | EnumImmutableSet
+ // ImmutableList (abstract class)
+ // +- EmptyImmutableSet
+ // | EmptyImmutableSet
+ // +- SingletonImmutableSet
+ // | Optimized for Set with only 1 element.
+ // +- RegularImmutableSet
+ // | RegularImmutableList
+ // +- EnumImmutableSet
+ // | EnumImmutableSet
- config.registerSerialization(ImmutableSet.class, ImmutableSetSerializer.class);
+ config.registerSerialization(ImmutableSet.class, ImmutableSetSerializer.class);
- // Note:
- // Only registering above is good enough for serializing/deserializing.
- // but if using Kryo#copy, following is required.
+ // Note:
+ // Only registering above is good enough for serializing/deserializing.
+ // but if using Kryo#copy, following is required.
- config.registerSerialization(ImmutableSet.of().getClass(), ImmutableSetSerializer.class);
- config.registerSerialization(ImmutableSet.of(1).getClass(), ImmutableSetSerializer.class);
- config.registerSerialization(ImmutableSet.of(1,2,3).getClass(), ImmutableSetSerializer.class);
+ config.registerSerialization(ImmutableSet.of().getClass(), ImmutableSetSerializer.class);
+ config.registerSerialization(ImmutableSet.of(1).getClass(), ImmutableSetSerializer.class);
+ config.registerSerialization(ImmutableSet.of(1, 2, 3).getClass(), ImmutableSetSerializer.class);
- config.registerSerialization(
- Sets.immutableEnumSet(SomeEnum.A, SomeEnum.B, SomeEnum.C).getClass(), ImmutableSetSerializer.class);
- }
+ config.registerSerialization(
+ Sets.immutableEnumSet(SomeEnum.A, SomeEnum.B, SomeEnum.C).getClass(),
+ ImmutableSetSerializer.class);
+ }
- private enum SomeEnum {
- A, B, C
- }
+ private enum SomeEnum {
+ A, B, C
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/KvStoreIterableSerializer.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/KvStoreIterableSerializer.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/KvStoreIterableSerializer.java
index decfb3f..b47f3b7 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/KvStoreIterableSerializer.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/KvStoreIterableSerializer.java
@@ -6,50 +6,49 @@ import com.alibaba.jstorm.esotericsoftware.kryo.Serializer;
import com.alibaba.jstorm.esotericsoftware.kryo.io.Input;
import com.alibaba.jstorm.esotericsoftware.kryo.io.Output;
import com.google.common.collect.Lists;
-
import java.util.Iterator;
import java.util.List;
public class KvStoreIterableSerializer extends Serializer<KvStoreIterable<Object>> {
- public KvStoreIterableSerializer() {
+ public KvStoreIterableSerializer() {
- }
+ }
- @Override
- public void write(Kryo kryo, Output output, KvStoreIterable<Object> object) {
- List<Object> values = Lists.newArrayList(object);
- output.writeInt(values.size(), true);
- for (Object elm : object) {
- kryo.writeClassAndObject(output, elm);
- }
+ @Override
+ public void write(Kryo kryo, Output output, KvStoreIterable<Object> object) {
+ List<Object> values = Lists.newArrayList(object);
+ output.writeInt(values.size(), true);
+ for (Object elm : object) {
+ kryo.writeClassAndObject(output, elm);
}
-
- @Override
- public KvStoreIterable<Object> read(Kryo kryo, Input input, Class<KvStoreIterable<Object>> type) {
- final int size = input.readInt(true);
- List<Object> values = Lists.newArrayList();
- for (int i = 0; i < size; ++i) {
- values.add(kryo.readClassAndObject(input));
- }
-
- return new KvStoreIterable<Object>() {
- Iterable<Object> values;
-
- @Override
- public Iterator<Object> iterator() {
- return values.iterator();
- }
-
- public KvStoreIterable init(Iterable<Object> values) {
- this.values = values;
- return this;
- }
-
- @Override
- public String toString() {
- return values.toString();
- }
- }.init(values);
+ }
+
+ @Override
+ public KvStoreIterable<Object> read(Kryo kryo, Input input, Class<KvStoreIterable<Object>> type) {
+ final int size = input.readInt(true);
+ List<Object> values = Lists.newArrayList();
+ for (int i = 0; i < size; ++i) {
+ values.add(kryo.readClassAndObject(input));
}
+
+ return new KvStoreIterable<Object>() {
+ Iterable<Object> values;
+
+ @Override
+ public Iterator<Object> iterator() {
+ return values.iterator();
+ }
+
+ public KvStoreIterable init(Iterable<Object> values) {
+ this.values = values;
+ return this;
+ }
+
+ @Override
+ public String toString() {
+ return values.toString();
+ }
+ }.init(values);
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmuListSerializer.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmuListSerializer.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmuListSerializer.java
index 9bb315b..dd4272c 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmuListSerializer.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmuListSerializer.java
@@ -6,73 +6,83 @@ import com.alibaba.jstorm.esotericsoftware.kryo.Kryo;
import com.alibaba.jstorm.esotericsoftware.kryo.Serializer;
import com.alibaba.jstorm.esotericsoftware.kryo.io.Input;
import com.alibaba.jstorm.esotericsoftware.kryo.io.Output;
-import org.apache.beam.sdk.repackaged.com.google.common.collect.*;
+import org.apache.beam.sdk.repackaged.com.google.common.collect.HashBasedTable;
+import org.apache.beam.sdk.repackaged.com.google.common.collect.ImmutableList;
+import org.apache.beam.sdk.repackaged.com.google.common.collect.ImmutableTable;
+import org.apache.beam.sdk.repackaged.com.google.common.collect.Lists;
+import org.apache.beam.sdk.repackaged.com.google.common.collect.Table;
public class SdkRepackImmuListSerializer extends Serializer<ImmutableList<Object>> {
- private static final boolean DOES_NOT_ACCEPT_NULL = false;
- private static final boolean IMMUTABLE = true;
+ private static final boolean DOES_NOT_ACCEPT_NULL = false;
+ private static final boolean IMMUTABLE = true;
- public SdkRepackImmuListSerializer() {
- super(DOES_NOT_ACCEPT_NULL, IMMUTABLE);
- }
+ public SdkRepackImmuListSerializer() {
+ super(DOES_NOT_ACCEPT_NULL, IMMUTABLE);
+ }
- @Override
- public void write(Kryo kryo, Output output, ImmutableList<Object> object) {
- output.writeInt(object.size(), true);
- for (Object elm : object) {
- kryo.writeClassAndObject(output, elm);
- }
+ @Override
+ public void write(Kryo kryo, Output output, ImmutableList<Object> object) {
+ output.writeInt(object.size(), true);
+ for (Object elm : object) {
+ kryo.writeClassAndObject(output, elm);
}
+ }
- @Override
- public ImmutableList<Object> read(Kryo kryo, Input input, Class<ImmutableList<Object>> type) {
- final int size = input.readInt(true);
- final Object[] list = new Object[size];
- for (int i = 0; i < size; ++i) {
- list[i] = kryo.readClassAndObject(input);
- }
- return ImmutableList.copyOf(list);
+ @Override
+ public ImmutableList<Object> read(Kryo kryo, Input input, Class<ImmutableList<Object>> type) {
+ final int size = input.readInt(true);
+ final Object[] list = new Object[size];
+ for (int i = 0; i < size; ++i) {
+ list[i] = kryo.readClassAndObject(input);
}
+ return ImmutableList.copyOf(list);
+ }
- /**
- * Creates a new {@link ImmutableListSerializer} and registers its serializer
- * for the several ImmutableList related classes.
- */
- public static void registerSerializers(Config config) {
+ /**
+ * Creates a new {@link ImmutableListSerializer} and registers its serializer
+ * for the several ImmutableList related classes.
+ */
+ public static void registerSerializers(Config config) {
- // ImmutableList (abstract class)
- // +- RegularImmutableList
- // | RegularImmutableList
- // +- SingletonImmutableList
- // | Optimized for List with only 1 element.
- // +- SubList
- // | Representation for part of ImmutableList
- // +- ReverseImmutableList
- // | For iterating in reverse order
- // +- StringAsImmutableList
- // | Used by Lists#charactersOf
- // +- Values (ImmutableTable values)
- // Used by return value of #values() when there are multiple cells
+ // ImmutableList (abstract class)
+ // +- RegularImmutableList
+ // | RegularImmutableList
+ // +- SingletonImmutableList
+ // | Optimized for List with only 1 element.
+ // +- SubList
+ // | Representation for part of ImmutableList
+ // +- ReverseImmutableList
+ // | For iterating in reverse order
+ // +- StringAsImmutableList
+ // | Used by Lists#charactersOf
+ // +- Values (ImmutableTable values)
+ // Used by return value of #values() when there are multiple cells
- config.registerSerialization(ImmutableList.class, SdkRepackImmuListSerializer.class);
+ config.registerSerialization(ImmutableList.class, SdkRepackImmuListSerializer.class);
- // Note:
- // Only registering above is good enough for serializing/deserializing.
- // but if using Kryo#copy, following is required.
+ // Note:
+ // Only registering above is good enough for serializing/deserializing.
+ // but if using Kryo#copy, following is required.
- config.registerSerialization(ImmutableList.of().getClass(), SdkRepackImmuListSerializer.class);
- config.registerSerialization(ImmutableList.of(1).getClass(), SdkRepackImmuListSerializer.class);
- config.registerSerialization(ImmutableList.of(1,2,3).subList(1, 2).getClass(), SdkRepackImmuListSerializer.class);
- config.registerSerialization(ImmutableList.of().reverse().getClass(), SdkRepackImmuListSerializer.class);
+ config.registerSerialization(ImmutableList.of().getClass(), SdkRepackImmuListSerializer.class);
+ config.registerSerialization(ImmutableList.of(1).getClass(), SdkRepackImmuListSerializer.class);
+ config.registerSerialization(
+ ImmutableList.of(1, 2, 3).subList(1, 2).getClass(),
+ SdkRepackImmuListSerializer.class);
+ config.registerSerialization(
+ ImmutableList.of().reverse().getClass(),
+ SdkRepackImmuListSerializer.class);
- config.registerSerialization(Lists.charactersOf("KryoRocks").getClass(), SdkRepackImmuListSerializer.class);
+ config.registerSerialization(
+ Lists.charactersOf("KryoRocks").getClass(),
+ SdkRepackImmuListSerializer.class);
- Table<Integer,Integer,Integer> baseTable = HashBasedTable.create();
- baseTable.put(1, 2, 3);
- baseTable.put(4, 5, 6);
- Table<Integer, Integer, Integer> table = ImmutableTable.copyOf(baseTable);
- config.registerSerialization(table.values().getClass(), SdkRepackImmuListSerializer.class);
+ Table<Integer, Integer, Integer> baseTable = HashBasedTable.create();
+ baseTable.put(1, 2, 3);
+ baseTable.put(4, 5, 6);
+ Table<Integer, Integer, Integer> table = ImmutableTable.copyOf(baseTable);
+ config.registerSerialization(table.values().getClass(), SdkRepackImmuListSerializer.class);
- }
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmuSetSerializer.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmuSetSerializer.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmuSetSerializer.java
index a514645..6973c82 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmuSetSerializer.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmuSetSerializer.java
@@ -5,67 +5,71 @@ import com.alibaba.jstorm.esotericsoftware.kryo.Kryo;
import com.alibaba.jstorm.esotericsoftware.kryo.Serializer;
import com.alibaba.jstorm.esotericsoftware.kryo.io.Input;
import com.alibaba.jstorm.esotericsoftware.kryo.io.Output;
-import org.apache.beam.sdk.repackaged.com.google.common.collect.*;
+import org.apache.beam.sdk.repackaged.com.google.common.collect.ImmutableSet;
+import org.apache.beam.sdk.repackaged.com.google.common.collect.Sets;
public class SdkRepackImmuSetSerializer extends Serializer<ImmutableSet<Object>> {
- private static final boolean DOES_NOT_ACCEPT_NULL = false;
- private static final boolean IMMUTABLE = true;
+ private static final boolean DOES_NOT_ACCEPT_NULL = false;
+ private static final boolean IMMUTABLE = true;
- public SdkRepackImmuSetSerializer() {
- super(DOES_NOT_ACCEPT_NULL, IMMUTABLE);
- }
+ public SdkRepackImmuSetSerializer() {
+ super(DOES_NOT_ACCEPT_NULL, IMMUTABLE);
+ }
- @Override
- public void write(Kryo kryo, Output output, ImmutableSet<Object> object) {
- output.writeInt(object.size(), true);
- for (Object elm : object) {
- kryo.writeClassAndObject(output, elm);
- }
+ @Override
+ public void write(Kryo kryo, Output output, ImmutableSet<Object> object) {
+ output.writeInt(object.size(), true);
+ for (Object elm : object) {
+ kryo.writeClassAndObject(output, elm);
}
+ }
- @Override
- public ImmutableSet<Object> read(Kryo kryo, Input input, Class<ImmutableSet<Object>> type) {
- final int size = input.readInt(true);
- ImmutableSet.Builder<Object> builder = ImmutableSet.builder();
- for (int i = 0; i < size; ++i) {
- builder.add(kryo.readClassAndObject(input));
- }
- return builder.build();
+ @Override
+ public ImmutableSet<Object> read(Kryo kryo, Input input, Class<ImmutableSet<Object>> type) {
+ final int size = input.readInt(true);
+ ImmutableSet.Builder<Object> builder = ImmutableSet.builder();
+ for (int i = 0; i < size; ++i) {
+ builder.add(kryo.readClassAndObject(input));
}
+ return builder.build();
+ }
- /**
- * Creates a new {@link ImmutableSetSerializer} and registers its serializer
- * for the several ImmutableSet related classes.
- */
- public static void registerSerializers(Config config) {
+ /**
+ * Creates a new {@link ImmutableSetSerializer} and registers its serializer
+ * for the several ImmutableSet related classes.
+ */
+ public static void registerSerializers(Config config) {
- // ImmutableList (abstract class)
- // +- EmptyImmutableSet
- // | EmptyImmutableSet
- // +- SingletonImmutableSet
- // | Optimized for Set with only 1 element.
- // +- RegularImmutableSet
- // | RegularImmutableList
- // +- EnumImmutableSet
- // | EnumImmutableSet
+ // ImmutableList (abstract class)
+ // +- EmptyImmutableSet
+ // | EmptyImmutableSet
+ // +- SingletonImmutableSet
+ // | Optimized for Set with only 1 element.
+ // +- RegularImmutableSet
+ // | RegularImmutableList
+ // +- EnumImmutableSet
+ // | EnumImmutableSet
- config.registerSerialization(ImmutableSet.class, SdkRepackImmuSetSerializer.class);
+ config.registerSerialization(ImmutableSet.class, SdkRepackImmuSetSerializer.class);
- // Note:
- // Only registering above is good enough for serializing/deserializing.
- // but if using Kryo#copy, following is required.
+ // Note:
+ // Only registering above is good enough for serializing/deserializing.
+ // but if using Kryo#copy, following is required.
- config.registerSerialization(ImmutableSet.of().getClass(), SdkRepackImmuSetSerializer.class);
- config.registerSerialization(ImmutableSet.of(1).getClass(), SdkRepackImmuSetSerializer.class);
- config.registerSerialization(ImmutableSet.of(1,2,3).getClass(), SdkRepackImmuSetSerializer.class);
+ config.registerSerialization(ImmutableSet.of().getClass(), SdkRepackImmuSetSerializer.class);
+ config.registerSerialization(ImmutableSet.of(1).getClass(), SdkRepackImmuSetSerializer.class);
+ config.registerSerialization(
+ ImmutableSet.of(1, 2, 3).getClass(),
+ SdkRepackImmuSetSerializer.class);
- config.registerSerialization(
- Sets.immutableEnumSet(SomeEnum.A, SomeEnum.B, SomeEnum.C).getClass(), SdkRepackImmuSetSerializer.class);
- }
+ config.registerSerialization(
+ Sets.immutableEnumSet(SomeEnum.A, SomeEnum.B, SomeEnum.C).getClass(),
+ SdkRepackImmuSetSerializer.class);
+ }
- private enum SomeEnum {
- A, B, C
- }
+ private enum SomeEnum {
+ A, B, C
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/UnmodifiableCollectionsSerializer.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/UnmodifiableCollectionsSerializer.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/UnmodifiableCollectionsSerializer.java
index c8b0138..bcee778 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/UnmodifiableCollectionsSerializer.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/UnmodifiableCollectionsSerializer.java
@@ -5,155 +5,177 @@ import com.alibaba.jstorm.esotericsoftware.kryo.Kryo;
import com.alibaba.jstorm.esotericsoftware.kryo.Serializer;
import com.alibaba.jstorm.esotericsoftware.kryo.io.Input;
import com.alibaba.jstorm.esotericsoftware.kryo.io.Output;
-
import java.lang.reflect.Field;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
public class UnmodifiableCollectionsSerializer extends Serializer<Object> {
- private static final Field SOURCE_COLLECTION_FIELD;
- private static final Field SOURCE_MAP_FIELD;
+ private static final Field SOURCE_COLLECTION_FIELD;
+ private static final Field SOURCE_MAP_FIELD;
- static {
- try {
- SOURCE_COLLECTION_FIELD = Class.forName("java.util.Collections$UnmodifiableCollection" )
- .getDeclaredField( "c" );
- SOURCE_COLLECTION_FIELD.setAccessible( true );
+ static {
+ try {
+ SOURCE_COLLECTION_FIELD = Class.forName("java.util.Collections$UnmodifiableCollection")
+ .getDeclaredField("c");
+ SOURCE_COLLECTION_FIELD.setAccessible(true);
- SOURCE_MAP_FIELD = Class.forName("java.util.Collections$UnmodifiableMap" )
- .getDeclaredField( "m" );
- SOURCE_MAP_FIELD.setAccessible( true );
- } catch ( final Exception e ) {
- throw new RuntimeException( "Could not access source collection" +
- " field in java.util.Collections$UnmodifiableCollection.", e );
- }
+ SOURCE_MAP_FIELD = Class.forName("java.util.Collections$UnmodifiableMap")
+ .getDeclaredField("m");
+ SOURCE_MAP_FIELD.setAccessible(true);
+ } catch (final Exception e) {
+ throw new RuntimeException("Could not access source collection"
+ + " field in java.util.Collections$UnmodifiableCollection.", e);
}
-
- @Override
- public Object read(final Kryo kryo, final Input input, final Class<Object> clazz) {
- final int ordinal = input.readInt( true );
- final UnmodifiableCollection unmodifiableCollection = UnmodifiableCollection.values()[ordinal];
- final Object sourceCollection = kryo.readClassAndObject( input );
- return unmodifiableCollection.create( sourceCollection );
+ }
+
+ @Override
+ public Object read(final Kryo kryo, final Input input, final Class<Object> clazz) {
+ final int ordinal = input.readInt(true);
+ final UnmodifiableCollection unmodifiableCollection = UnmodifiableCollection.values()[ordinal];
+ final Object sourceCollection = kryo.readClassAndObject(input);
+ return unmodifiableCollection.create(sourceCollection);
+ }
+
+ @Override
+ public void write(final Kryo kryo, final Output output, final Object object) {
+ try {
+ final UnmodifiableCollection unmodifiableCollection =
+ UnmodifiableCollection.valueOfType(object.getClass());
+ // the ordinal could be replaced by s.th. else (e.g. a explicitely managed "id")
+ output.writeInt(unmodifiableCollection.ordinal(), true);
+ kryo.writeClassAndObject(output, unmodifiableCollection.sourceCollectionField.get(object));
+ } catch (final RuntimeException e) {
+ // Don't eat and wrap RuntimeExceptions because the ObjectBuffer.write...
+ // handles SerializationException specifically (resizing the buffer)...
+ throw e;
+ } catch (final Exception e) {
+ throw new RuntimeException(e);
}
-
- @Override
- public void write(final Kryo kryo, final Output output, final Object object) {
- try {
- final UnmodifiableCollection unmodifiableCollection = UnmodifiableCollection.valueOfType( object.getClass() );
- // the ordinal could be replaced by s.th. else (e.g. a explicitely managed "id")
- output.writeInt( unmodifiableCollection.ordinal(), true );
- kryo.writeClassAndObject( output, unmodifiableCollection.sourceCollectionField.get( object ) );
- } catch ( final RuntimeException e ) {
- // Don't eat and wrap RuntimeExceptions because the ObjectBuffer.write...
- // handles SerializationException specifically (resizing the buffer)...
- throw e;
- } catch ( final Exception e ) {
- throw new RuntimeException( e );
- }
+ }
+
+ @Override
+ public Object copy(Kryo kryo, Object original) {
+ try {
+ final UnmodifiableCollection unmodifiableCollection =
+ UnmodifiableCollection.valueOfType(original.getClass());
+ Object sourceCollectionCopy =
+ kryo.copy(unmodifiableCollection.sourceCollectionField.get(original));
+ return unmodifiableCollection.create(sourceCollectionCopy);
+ } catch (final RuntimeException e) {
+ // Don't eat and wrap RuntimeExceptions
+ throw e;
+ } catch (final Exception e) {
+ throw new RuntimeException(e);
}
-
- @Override
- public Object copy(Kryo kryo, Object original) {
- try {
- final UnmodifiableCollection unmodifiableCollection = UnmodifiableCollection.valueOfType( original.getClass() );
- Object sourceCollectionCopy = kryo.copy(unmodifiableCollection.sourceCollectionField.get(original));
- return unmodifiableCollection.create( sourceCollectionCopy );
- } catch ( final RuntimeException e ) {
- // Don't eat and wrap RuntimeExceptions
- throw e;
- } catch ( final Exception e ) {
- throw new RuntimeException( e );
- }
+ }
+
+ private static enum UnmodifiableCollection {
+ COLLECTION(
+ Collections.unmodifiableCollection(Arrays.asList("")).getClass(),
+ SOURCE_COLLECTION_FIELD) {
+ @Override
+ public Object create(final Object sourceCollection) {
+ return Collections.unmodifiableCollection((Collection<?>) sourceCollection);
+ }
+ },
+ RANDOM_ACCESS_LIST(
+ Collections.unmodifiableList(new ArrayList<Void>()).getClass(),
+ SOURCE_COLLECTION_FIELD) {
+ @Override
+ public Object create(final Object sourceCollection) {
+ return Collections.unmodifiableList((List<?>) sourceCollection);
+ }
+ },
+ LIST(Collections.unmodifiableList(new LinkedList<Void>()).getClass(), SOURCE_COLLECTION_FIELD) {
+ @Override
+ public Object create(final Object sourceCollection) {
+ return Collections.unmodifiableList((List<?>) sourceCollection);
+ }
+ },
+ SET(Collections.unmodifiableSet(new HashSet<Void>()).getClass(), SOURCE_COLLECTION_FIELD) {
+ @Override
+ public Object create(final Object sourceCollection) {
+ return Collections.unmodifiableSet((Set<?>) sourceCollection);
+ }
+ },
+ SORTED_SET(
+ Collections.unmodifiableSortedSet(new TreeSet<Void>()).getClass(),
+ SOURCE_COLLECTION_FIELD) {
+ @Override
+ public Object create(final Object sourceCollection) {
+ return Collections.unmodifiableSortedSet((SortedSet<?>) sourceCollection);
+ }
+ },
+ MAP(Collections.unmodifiableMap(new HashMap<Void, Void>()).getClass(), SOURCE_MAP_FIELD) {
+ @Override
+ public Object create(final Object sourceCollection) {
+ return Collections.unmodifiableMap((Map<?, ?>) sourceCollection);
+ }
+
+ },
+ SORTED_MAP(
+ Collections.unmodifiableSortedMap(new TreeMap<Void, Void>()).getClass(),
+ SOURCE_MAP_FIELD) {
+ @Override
+ public Object create(final Object sourceCollection) {
+ return Collections.unmodifiableSortedMap((SortedMap<?, ?>) sourceCollection);
+ }
+ };
+
+ private final Class<?> type;
+ private final Field sourceCollectionField;
+
+ private UnmodifiableCollection(final Class<?> type, final Field sourceCollectionField) {
+ this.type = type;
+ this.sourceCollectionField = sourceCollectionField;
}
- private static enum UnmodifiableCollection {
- COLLECTION( Collections.unmodifiableCollection( Arrays.asList( "" ) ).getClass(), SOURCE_COLLECTION_FIELD ){
- @Override
- public Object create( final Object sourceCollection ) {
- return Collections.unmodifiableCollection( (Collection<?>) sourceCollection );
- }
- },
- RANDOM_ACCESS_LIST( Collections.unmodifiableList( new ArrayList<Void>() ).getClass(), SOURCE_COLLECTION_FIELD ){
- @Override
- public Object create( final Object sourceCollection ) {
- return Collections.unmodifiableList( (List<?>) sourceCollection );
- }
- },
- LIST( Collections.unmodifiableList( new LinkedList<Void>() ).getClass(), SOURCE_COLLECTION_FIELD ){
- @Override
- public Object create( final Object sourceCollection ) {
- return Collections.unmodifiableList( (List<?>) sourceCollection );
- }
- },
- SET( Collections.unmodifiableSet( new HashSet<Void>() ).getClass(), SOURCE_COLLECTION_FIELD ){
- @Override
- public Object create( final Object sourceCollection ) {
- return Collections.unmodifiableSet( (Set<?>) sourceCollection );
- }
- },
- SORTED_SET( Collections.unmodifiableSortedSet( new TreeSet<Void>() ).getClass(), SOURCE_COLLECTION_FIELD ){
- @Override
- public Object create( final Object sourceCollection ) {
- return Collections.unmodifiableSortedSet( (SortedSet<?>) sourceCollection );
- }
- },
- MAP( Collections.unmodifiableMap( new HashMap<Void, Void>() ).getClass(), SOURCE_MAP_FIELD ) {
-
- @Override
- public Object create( final Object sourceCollection ) {
- return Collections.unmodifiableMap( (Map<?, ?>) sourceCollection );
- }
-
- },
- SORTED_MAP( Collections.unmodifiableSortedMap( new TreeMap<Void, Void>() ).getClass(), SOURCE_MAP_FIELD ) {
- @Override
- public Object create( final Object sourceCollection ) {
- return Collections.unmodifiableSortedMap( (SortedMap<?, ?>) sourceCollection );
- }
- };
-
- private final Class<?> type;
- private final Field sourceCollectionField;
-
- private UnmodifiableCollection( final Class<?> type, final Field sourceCollectionField ) {
- this.type = type;
- this.sourceCollectionField = sourceCollectionField;
- }
+ /**
+ * @param sourceCollection
+ */
+ public abstract Object create(Object sourceCollection);
- /**
- * @param sourceCollection
- */
- public abstract Object create( Object sourceCollection );
-
- static UnmodifiableCollection valueOfType(final Class<?> type ) {
- for( final UnmodifiableCollection item : values() ) {
- if ( item.type.equals( type ) ) {
- return item;
- }
- }
- throw new IllegalArgumentException( "The type " + type + " is not supported." );
+ static UnmodifiableCollection valueOfType(final Class<?> type) {
+ for (final UnmodifiableCollection item : values()) {
+ if (item.type.equals(type)) {
+ return item;
}
-
+ }
+ throw new IllegalArgumentException("The type " + type + " is not supported.");
}
- /**
- * Creates a new {@link UnmodifiableCollectionsSerializer} and registers its serializer
- * for the several unmodifiable Collections that can be created via {@link Collections},
- * including {@link Map}s.
- *
- * @see Collections#unmodifiableCollection(Collection)
- * @see Collections#unmodifiableList(List)
- * @see Collections#unmodifiableSet(Set)
- * @see Collections#unmodifiableSortedSet(SortedSet)
- * @see Collections#unmodifiableMap(Map)
- * @see Collections#unmodifiableSortedMap(SortedMap)
- */
- public static void registerSerializers( Config config ) {
- UnmodifiableCollection.values();
- for ( final UnmodifiableCollection item : UnmodifiableCollection.values() ) {
- config.registerSerialization( item.type, UnmodifiableCollectionsSerializer.class );
- }
+ }
+
+ /**
+ * Creates a new {@link UnmodifiableCollectionsSerializer} and registers its serializer
+ * for the several unmodifiable Collections that can be created via {@link Collections},
+ * including {@link Map}s.
+ *
+ * @see Collections#unmodifiableCollection(Collection)
+ * @see Collections#unmodifiableList(List)
+ * @see Collections#unmodifiableSet(Set)
+ * @see Collections#unmodifiableSortedSet(SortedSet)
+ * @see Collections#unmodifiableMap(Map)
+ * @see Collections#unmodifiableSortedMap(SortedMap)
+ */
+ public static void registerSerializers(Config config) {
+ UnmodifiableCollection.values();
+ for (final UnmodifiableCollection item : UnmodifiableCollection.values()) {
+ config.registerSerialization(item.type, UnmodifiableCollectionsSerializer.class);
}
+ }
}