You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pe...@apache.org on 2017/08/20 15:03:08 UTC
[08/53] [abbrv] beam git commit: jstorm-runner: rename the package to
org.apache.beam.runners.jstorm.
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/TranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/TranslationContext.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/TranslationContext.java
deleted file mode 100644
index c3e9805..0000000
--- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/TranslationContext.java
+++ /dev/null
@@ -1,425 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.beam.translation;
-
-import avro.shaded.com.google.common.collect.Lists;
-import com.alibaba.jstorm.beam.translation.translator.Stream;
-import com.alibaba.jstorm.beam.util.RunnerUtils;
-import com.google.common.base.Strings;
-import com.google.common.base.Function;
-import com.google.common.base.Joiner;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import org.apache.beam.sdk.runners.AppliedPTransform;
-import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.PValueBase;
-import org.apache.beam.sdk.values.TaggedPValue;
-import org.apache.beam.sdk.values.TupleTag;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.alibaba.jstorm.beam.StormPipelineOptions;
-import com.alibaba.jstorm.beam.translation.runtime.AdaptorBasicSpout;
-import com.alibaba.jstorm.beam.translation.runtime.Executor;
-import com.alibaba.jstorm.beam.translation.runtime.ExecutorsBolt;
-import com.alibaba.jstorm.beam.translation.util.CommonInstance;
-
-import java.util.*;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-
-/**
- * Maintains the state necessary during Pipeline translation to build a Storm topology.
- */
-public class TranslationContext {
- private static final Logger LOG = LoggerFactory.getLogger(TranslationContext.class);
-
- private final UserGraphContext userGraphContext;
- private final ExecutionGraphContext executionGraphContext;
-
- public TranslationContext(StormPipelineOptions options) {
- this.userGraphContext = new UserGraphContext(options);
- this.executionGraphContext = new ExecutionGraphContext();
- }
-
- public ExecutionGraphContext getExecutionGraphContext() {
- return executionGraphContext;
- }
-
- public UserGraphContext getUserGraphContext() {
- return userGraphContext;
- }
-
- private void addStormStreamDef(TaggedPValue input, String destComponentName, Stream.Grouping grouping) {
- Stream.Producer producer = executionGraphContext.getProducer(input.getValue());
- if (!producer.getComponentId().equals(destComponentName)) {
- Stream.Consumer consumer = Stream.Consumer.of(destComponentName, grouping);
- executionGraphContext.registerStreamConsumer(consumer, producer);
-
- ExecutorsBolt executorsBolt = executionGraphContext.getBolt(producer.getComponentId());
- if (executorsBolt != null) {
- executorsBolt.addExternalOutputTag(input.getTag());
- }
- }
- }
-
- private String getUpstreamExecutorsBolt() {
- for (PValue value : userGraphContext.getInputs().values()) {
- String componentId = executionGraphContext.getProducerComponentId(value);
- if (componentId != null && executionGraphContext.getBolt(componentId) != null) {
- return componentId;
- }
- }
- // When upstream component is spout, "null" will be return.
- return null;
- }
-
- /**
- * check if the current transform is applied to source collection.
- * @return
- */
- private boolean connectedToSource() {
- for (PValue value : userGraphContext.getInputs().values()) {
- if (executionGraphContext.producedBySpout(value)) {
- return true;
- }
- }
- return false;
- }
-
- /**
- * @param upstreamExecutorsBolt
- * @return true if there is multiple input streams, or upstream executor output the same stream
- * to different executors
- */
- private boolean isMultipleInputOrOutput(ExecutorsBolt upstreamExecutorsBolt, Map<TupleTag<?>, PValue> inputs) {
- if (inputs.size() > 1) {
- return true;
- } else {
- final Sets.SetView<TupleTag> intersection = Sets.intersection(upstreamExecutorsBolt.getExecutors().keySet(), inputs.keySet());
- if (!intersection.isEmpty()) {
- // there is already a different executor consume the same input
- return true;
- } else {
- return false;
- }
- }
- }
-
- public void addTransformExecutor(Executor executor) {
- addTransformExecutor(executor, Collections.EMPTY_LIST);
- }
-
- public void addTransformExecutor(Executor executor, List<PValue> sideInputs) {
- addTransformExecutor(executor, userGraphContext.getInputs(), userGraphContext.getOutputs(), sideInputs);
- }
-
- public void addTransformExecutor(Executor executor, Map<TupleTag<?>, PValue> inputs, Map<TupleTag<?>, PValue> outputs) {
- addTransformExecutor(executor, inputs, outputs, Collections.EMPTY_LIST);
- }
-
- public void addTransformExecutor(Executor executor, Map<TupleTag<?>, PValue> inputs, Map<TupleTag<?>, PValue> outputs, List<PValue> sideInputs) {
- String name = null;
-
- ExecutorsBolt bolt = null;
-
- boolean isGBK = false;
- /**
- * Check if the transform executor needs to be chained into an existing ExecutorsBolt.
- * For following cases, a new bolt is created for the specified executor, otherwise the executor
- * will be added into the bolt contains corresponding upstream executor.
- * a) it is a GroupByKey executor
- * b) it is connected to source directly
- * c) None existing upstream bolt was found
- * d) For the purpose of performance to reduce the side effects between multiple streams which
- * is output to same executor, a new bolt will be created.
- */
- if (RunnerUtils.isGroupByKeyExecutor(executor)) {
- bolt = new ExecutorsBolt();
- name = executionGraphContext.registerBolt(bolt);
- isGBK = true;
- } else if (connectedToSource()) {
- bolt = new ExecutorsBolt();
- name = executionGraphContext.registerBolt(bolt);
- } else {
- name = getUpstreamExecutorsBolt();
- if (name == null) {
- bolt = new ExecutorsBolt();
- name = executionGraphContext.registerBolt(bolt);
- } else {
- bolt = executionGraphContext.getBolt(name);
- if (isMultipleInputOrOutput(bolt, inputs)) {
- bolt = new ExecutorsBolt();
- name = executionGraphContext.registerBolt(bolt);
- }
- }
- }
-
- // update the output tags of current transform into ExecutorsBolt
- for (Map.Entry<TupleTag<?>, PValue> entry : outputs.entrySet()) {
- TupleTag tag = entry.getKey();
- PValue value = entry.getValue();
-
- // use tag of PValueBase
- if (value instanceof PValueBase) {
- tag = ((PValueBase) value).expand().keySet().iterator().next();
- }
- executionGraphContext.registerStreamProducer(
- TaggedPValue.of(tag, value),
- Stream.Producer.of(name, tag.getId(), value.getName()));
- //bolt.addOutputTags(tag);
- }
-
- // add the transform executor into the chain of ExecutorsBolt
- for (Map.Entry<TupleTag<?>, PValue> entry : inputs.entrySet()) {
- TupleTag tag = entry.getKey();
- PValue value = entry.getValue();
- bolt.addExecutor(tag, executor);
-
- // filter all connections inside bolt
- //if (!bolt.getOutputTags().contains(tag)) {
- Stream.Grouping grouping;
- if (isGBK) {
- grouping = Stream.Grouping.byFields(Arrays.asList(CommonInstance.KEY));
- } else {
- grouping = Stream.Grouping.of(Stream.Grouping.Type.LOCAL_OR_SHUFFLE);
- }
- addStormStreamDef(TaggedPValue.of(tag, value), name, grouping);
- //}
- }
-
- for (PValue sideInput : sideInputs) {
- TupleTag tag = userGraphContext.findTupleTag(sideInput);
- bolt.addExecutor(tag, executor);
- checkState(!bolt.getOutputTags().contains(tag));
- addStormStreamDef(TaggedPValue.of(tag, sideInput), name, Stream.Grouping.of(Stream.Grouping.Type.ALL));
- }
-
- bolt.registerExecutor(executor);
-
- // set parallelismNumber
- String pTransformfullName = userGraphContext.currentTransform.getFullName();
- String compositeName = pTransformfullName.split("/")[0];
- Map parallelismNumMap = userGraphContext.getOptions().getParallelismNumMap();
- if (parallelismNumMap.containsKey(compositeName)) {
- int configNum = (Integer) parallelismNumMap.get(compositeName);
- int currNum = bolt.getParallelismNum();
- bolt.setParallelismNum(Math.max(configNum, currNum));
- }
- }
-
- // TODO: add getSideInputs() and getSideOutputs().
- public static class UserGraphContext {
- private final StormPipelineOptions options;
- private final Map<PValue, TupleTag> pValueToTupleTag;
- private AppliedPTransform<?, ?, ?> currentTransform = null;
-
- private boolean isWindowed = false;
-
- public UserGraphContext(StormPipelineOptions options) {
- this.options = checkNotNull(options, "options");
- this.pValueToTupleTag = Maps.newHashMap();
- }
-
- public StormPipelineOptions getOptions() {
- return this.options;
- }
-
- public void setCurrentTransform(AppliedPTransform<?, ?, ?> transform) {
- this.currentTransform = transform;
- }
-
- public String getStepName() {
- return currentTransform.getFullName();
- }
-
- public <T extends PValue> T getInput() {
- return (T) currentTransform.getInputs().values().iterator().next();
- }
-
- public Map<TupleTag<?>, PValue> getInputs() {
- return currentTransform.getInputs();
- }
-
- public TupleTag<?> getInputTag() {
- return currentTransform.getInputs().keySet().iterator().next();
- }
-
- public List<TupleTag<?>> getInputTags() {
- return Lists.newArrayList(currentTransform.getInputs().keySet());
- }
-
- public <T extends PValue> T getOutput() {
- return (T) currentTransform.getOutputs().values().iterator().next();
- }
-
- public Map<TupleTag<?>, PValue> getOutputs() {
- return currentTransform.getOutputs();
- }
-
- public TupleTag<?> getOutputTag() {
- return currentTransform.getOutputs().keySet().iterator().next();
- }
-
- public List<TupleTag<?>> getOutputTags() {
- return Lists.newArrayList(currentTransform.getOutputs().keySet());
- }
-
- public void recordOutputTaggedPValue() {
- for (Map.Entry<TupleTag<?>, PValue> entry : getOutputs().entrySet()) {
- pValueToTupleTag.put(entry.getValue(), entry.getKey());
- }
- }
-
- public <T> TupleTag<T> findTupleTag(PValue pValue) {
- return pValueToTupleTag.get(checkNotNull(pValue, "pValue"));
- }
-
- public void setWindowed() {
- this.isWindowed = true;
- }
-
- public boolean isWindowed() {
- return this.isWindowed;
- }
-
- @Override
- public String toString() {
- return Joiner.on('\n').join(FluentIterable.from(pValueToTupleTag.entrySet())
- .transform(new Function<Map.Entry<PValue,TupleTag>, String>() {
- @Override
- public String apply(Map.Entry<PValue, TupleTag> entry) {
- return String.format("%s == %s", entry.getValue().getId(), entry.getKey().getName());
- }}));
- }
- }
-
- public static class ExecutionGraphContext {
-
- private final Map<String, AdaptorBasicSpout> spoutMap = new HashMap<>();
- private final Map<String, ExecutorsBolt> boltMap = new HashMap<>();
-
- // One-to-one mapping between Stream.Producer and TaggedPValue (or PValue).
- private final Map<PValue, Stream.Producer> pValueToProducer = new HashMap<>();
- private final Map<Stream.Producer, TaggedPValue> producerToTaggedPValue = new HashMap<>();
-
- private final List<Stream> streams = new ArrayList<>();
-
- private int id = 1;
-
- public void registerSpout(AdaptorBasicSpout spout, TaggedPValue output) {
- checkNotNull(spout, "spout");
- checkNotNull(output, "output");
- String name = "spout" + genId();
- this.spoutMap.put(name, spout);
- registerStreamProducer(
- output,
- Stream.Producer.of(name, output.getTag().getId(), output.getValue().getName()));
- }
-
- public AdaptorBasicSpout getSpout(String id) {
- if (Strings.isNullOrEmpty(id)) {
- return null;
- }
- return this.spoutMap.get(id);
- }
-
- public Map<String, AdaptorBasicSpout> getSpouts() {
- return this.spoutMap;
- }
-
- public String registerBolt(ExecutorsBolt bolt) {
- checkNotNull(bolt, "bolt");
- String name = "bolt" + genId();
- this.boltMap.put(name, bolt);
- return name;
- }
-
- public ExecutorsBolt getBolt(String id) {
- if (Strings.isNullOrEmpty(id)) {
- return null;
- }
- return this.boltMap.get(id);
- }
-
- public void registerStreamProducer(TaggedPValue taggedPValue, Stream.Producer producer) {
- checkNotNull(taggedPValue, "taggedPValue");
- checkNotNull(producer, "producer");
- pValueToProducer.put(taggedPValue.getValue(), producer);
- producerToTaggedPValue.put(producer, taggedPValue);
- }
-
- public Stream.Producer getProducer(PValue pValue) {
- return pValueToProducer.get(checkNotNull(pValue, "pValue"));
- }
-
- public String getProducerComponentId(PValue pValue) {
- Stream.Producer producer = getProducer(pValue);
- return producer == null ? null : producer.getComponentId();
- }
-
- public boolean producedBySpout(PValue pValue) {
- String componentId = getProducerComponentId(pValue);
- return getSpout(componentId) != null;
- }
-
- public void registerStreamConsumer(Stream.Consumer consumer, Stream.Producer producer) {
- streams.add(Stream.of(
- checkNotNull(producer, "producer"),
- checkNotNull(consumer, "consumer")));
- }
-
- public Map<PValue, Stream.Producer> getPValueToProducers() {
- return pValueToProducer;
- }
-
- public Iterable<Stream> getStreams() {
- return streams;
- }
-
- @Override
- public String toString() {
- List<String> ret = new ArrayList<>();
- ret.add("SPOUT");
- for (Map.Entry<String, AdaptorBasicSpout> entry : spoutMap.entrySet()) {
- ret.add(entry.getKey() + ": " + entry.getValue().toString());
- }
- ret.add("BOLT");
- for (Map.Entry<String, ExecutorsBolt> entry : boltMap.entrySet()) {
- ret.add(entry.getKey() + ": " + entry.getValue().toString());
- }
- ret.add("STREAM");
- for (Stream stream : streams) {
- ret.add(String.format(
- "%s@@%s ---> %s@@%s",
- stream.getProducer().getStreamId(),
- stream.getProducer().getComponentId(),
- stream.getConsumer().getGrouping(),
- stream.getConsumer().getComponentId()));
- }
- return Joiner.on("\n").join(ret);
- }
-
- private synchronized int genId() {
- return id++;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/TranslatorRegistry.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/TranslatorRegistry.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/TranslatorRegistry.java
deleted file mode 100644
index 5e92eea..0000000
--- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/TranslatorRegistry.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.beam.translation;
-
-import com.alibaba.jstorm.beam.translation.translator.*;
-import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.transforms.Flatten;
-import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Lookup table mapping PTransform types to associated TransformTranslator implementations.
- */
-public class TranslatorRegistry {
- private static final Logger LOG = LoggerFactory.getLogger(TranslatorRegistry.class);
-
- private static final Map<Class<? extends PTransform>, TransformTranslator> TRANSLATORS = new HashMap<>();
-
- static {
- TRANSLATORS.put(Read.Bounded.class, new BoundedSourceTranslator());
- TRANSLATORS.put(Read.Unbounded.class, new UnboundedSourceTranslator());
- // TRANSLATORS.put(Write.Bound.class, new WriteSinkStreamingTranslator());
- // TRANSLATORS.put(TextIO.Write.Bound.class, new TextIOWriteBoundStreamingTranslator());
-
- TRANSLATORS.put(ParDo.SingleOutput.class, new ParDoBoundTranslator());
- TRANSLATORS.put(ParDo.MultiOutput.class, new ParDoBoundMultiTranslator());
-
- //TRANSLATORS.put(Window.Bound.class, new WindowBoundTranslator<>());
- TRANSLATORS.put(Window.Assign.class, new WindowAssignTranslator<>());
-
- TRANSLATORS.put(Flatten.PCollections.class, new FlattenTranslator());
-
- TRANSLATORS.put(GroupByKey.class, new GroupByKeyTranslator());
-
- TRANSLATORS.put(ViewTranslator.CreateJStormPCollectionView.class, new ViewTranslator());
-
- /**
- * Currently, empty translation is required for combine and reshuffle. Because, the transforms will be
- * mapped to GroupByKey and Pardo finally. So we only need to translator the finally transforms.
- * If any improvement is required, the composite transforms will be translated in the future.
- */
- // TRANSLATORS.put(Combine.PerKey.class, new CombinePerKeyTranslator());
- // TRANSLATORS.put(Globally.class, new CombineGloballyTranslator());
- // TRANSLATORS.put(Reshuffle.class, new ReshuffleTranslator());
- }
-
- public static TransformTranslator<?> getTranslator(PTransform<?, ?> transform) {
- TransformTranslator<?> translator = TRANSLATORS.get(transform.getClass());
- if (translator == null) {
- LOG.warn("Unsupported operator={}", transform.getClass().getName());
- }
- return translator;
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/AbstractComponent.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/AbstractComponent.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/AbstractComponent.java
deleted file mode 100644
index 876546d..0000000
--- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/AbstractComponent.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.beam.translation.runtime;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import com.alibaba.jstorm.beam.translation.util.CommonInstance;
-
-import backtype.storm.topology.IComponent;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.utils.Utils;
-
-/*
- * Enable user to add output stream definitions by API, rather than hard-code.
- */
-public abstract class AbstractComponent implements IComponent {
- private Map<String, Fields> streamToFields = new HashMap<>();
- private Map<String, Boolean> keyStreams = new HashMap<>();
- private int parallelismNum = 0;
-
- public void addOutputField(String streamId) {
- addOutputField(streamId, new Fields(CommonInstance.VALUE));
- }
-
- public void addOutputField(String streamId, Fields fields) {
- streamToFields.put(streamId, fields);
- keyStreams.put(streamId, false);
- }
-
- public void addKVOutputField(String streamId) {
- streamToFields.put(streamId, new Fields(CommonInstance.KEY, CommonInstance.VALUE));
- keyStreams.put(streamId, true);
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- for (Map.Entry<String, Fields> entry : streamToFields.entrySet()) {
- declarer.declareStream(entry.getKey(), entry.getValue());
- }
- }
-
- public boolean keyedEmit(String streamId) {
- Boolean isKeyedStream = keyStreams.get(streamId);
- return isKeyedStream == null ? false : isKeyedStream;
- }
-
- public int getParallelismNum() {
- return parallelismNum;
- }
-
- public void setParallelismNum(int num) {
- parallelismNum = num;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/AdaptorBasicBolt.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/AdaptorBasicBolt.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/AdaptorBasicBolt.java
deleted file mode 100644
index d1308af..0000000
--- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/AdaptorBasicBolt.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.beam.translation.runtime;
-
-import backtype.storm.topology.IRichBatchBolt;
-
-public abstract class AdaptorBasicBolt extends AbstractComponent implements IRichBatchBolt {
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/AdaptorBasicSpout.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/AdaptorBasicSpout.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/AdaptorBasicSpout.java
deleted file mode 100644
index 2f77bfb..0000000
--- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/AdaptorBasicSpout.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.beam.translation.runtime;
-
-import backtype.storm.topology.IRichSpout;
-
-public abstract class AdaptorBasicSpout extends AbstractComponent implements IRichSpout {
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/DoFnExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/DoFnExecutor.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/DoFnExecutor.java
deleted file mode 100644
index 9d88c4d..0000000
--- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/DoFnExecutor.java
+++ /dev/null
@@ -1,330 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.beam.translation.runtime;
-
-import java.io.Serializable;
-import java.util.*;
-
-import avro.shaded.com.google.common.collect.Iterables;
-import com.alibaba.jstorm.beam.translation.runtime.state.JStormStateInternals;
-import com.alibaba.jstorm.beam.translation.runtime.timer.JStormTimerInternals;
-
-import com.alibaba.jstorm.cache.IKvStoreManager;
-import com.alibaba.jstorm.metric.MetricClient;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import org.apache.beam.runners.core.DoFnRunner;
-import org.apache.beam.runners.core.DoFnRunners;
-import org.apache.beam.runners.core.DoFnRunners.OutputManager;
-import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
-import org.apache.beam.runners.core.SideInputHandler;
-import org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner;
-import org.apache.beam.runners.core.StateInternals;
-import org.apache.beam.runners.core.StateTag;
-import org.apache.beam.runners.core.StateTags;
-import org.apache.beam.runners.core.TimerInternals;
-import org.apache.beam.runners.core.StateNamespace;
-import org.apache.beam.runners.core.StateNamespaces;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
-import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.runners.core.NullSideInputReader;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.WindowingStrategy;
-import org.apache.beam.sdk.state.BagState;
-import org.apache.beam.sdk.state.WatermarkHoldState;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-import org.joda.time.Instant;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.alibaba.jstorm.beam.StormPipelineOptions;
-import com.alibaba.jstorm.beam.translation.util.DefaultStepContext;
-import com.alibaba.jstorm.beam.util.SerializedPipelineOptions;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
-public class DoFnExecutor<InputT, OutputT> implements Executor {
- private static final long serialVersionUID = 5297603063991078668L;
-
- private static final Logger LOG = LoggerFactory.getLogger(DoFnExecutor.class);
-
- public class DoFnExecutorOutputManager implements OutputManager, Serializable {
- private static final long serialVersionUID = -661113364735206170L;
-
- @Override
- public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
- executorsBolt.processExecutorElem(tag, output);
- }
- }
-
- protected transient DoFnRunner<InputT, OutputT> runner = null;
- protected transient PushbackSideInputDoFnRunner<InputT, OutputT> pushbackRunner = null;
-
- protected final String stepName;
-
- protected int internalDoFnExecutorId;
-
- protected final String description;
-
- protected final TupleTag<OutputT> mainTupleTag;
- protected final List<TupleTag<?>> sideOutputTags;
-
- protected SerializedPipelineOptions serializedOptions;
- protected transient StormPipelineOptions pipelineOptions;
-
- protected DoFn<InputT, OutputT> doFn;
- protected final Coder<WindowedValue<InputT>> inputCoder;
- protected DoFnInvoker<InputT, OutputT> doFnInvoker;
- protected OutputManager outputManager;
- protected WindowingStrategy<?, ?> windowingStrategy;
- protected final TupleTag<InputT> mainInputTag;
- protected Collection<PCollectionView<?>> sideInputs;
- protected SideInputHandler sideInputHandler;
- protected final Map<TupleTag, PCollectionView<?>> sideInputTagToView;
-
- // Initialize during runtime
- protected ExecutorContext executorContext;
- protected ExecutorsBolt executorsBolt;
- protected TimerInternals timerInternals;
- protected transient StateInternals pushbackStateInternals;
- protected transient StateTag<BagState<WindowedValue<InputT>>> pushedBackTag;
- protected transient StateTag<WatermarkHoldState> watermarkHoldTag;
- protected transient IKvStoreManager kvStoreManager;
- protected DefaultStepContext stepContext;
- protected transient MetricClient metricClient;
-
- public DoFnExecutor(
- String stepName,
- String description,
- StormPipelineOptions pipelineOptions,
- DoFn<InputT, OutputT> doFn,
- Coder<WindowedValue<InputT>> inputCoder,
- WindowingStrategy<?, ?> windowingStrategy,
- TupleTag<InputT> mainInputTag,
- Collection<PCollectionView<?>> sideInputs,
- Map<TupleTag, PCollectionView<?>> sideInputTagToView,
- TupleTag<OutputT> mainTupleTag,
- List<TupleTag<?>> sideOutputTags) {
- this.stepName = checkNotNull(stepName, "stepName");
- this.description = checkNotNull(description, "description");
- this.serializedOptions = new SerializedPipelineOptions(pipelineOptions);
- this.doFn = doFn;
- this.inputCoder = inputCoder;
- this.outputManager = new DoFnExecutorOutputManager();
- this.windowingStrategy = windowingStrategy;
- this.mainInputTag = mainInputTag;
- this.sideInputs = sideInputs;
- this.mainTupleTag = mainTupleTag;
- this.sideOutputTags = sideOutputTags;
- this.sideInputTagToView = sideInputTagToView;
- }
-
- protected DoFnRunner<InputT, OutputT> getDoFnRunner() {
- return new DoFnRunnerWithMetrics<>(
- stepName,
- DoFnRunners.simpleRunner(
- this.pipelineOptions,
- this.doFn,
- this.sideInputHandler == null ? NullSideInputReader.empty() : sideInputHandler,
- this.outputManager,
- this.mainTupleTag,
- this.sideOutputTags,
- this.stepContext,
- this.windowingStrategy),
- MetricsReporter.create(metricClient));
- }
-
- protected void initService(ExecutorContext context) {
- // TODO: what should be set for key in here?
- timerInternals = new JStormTimerInternals(null /* key */, this, context.getExecutorsBolt().timerService());
- kvStoreManager = context.getKvStoreManager();
- stepContext = new DefaultStepContext(timerInternals,
- new JStormStateInternals(null, kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId));
- metricClient = new MetricClient(executorContext.getTopologyContext());
- }
-
- @Override
- public void init(ExecutorContext context) {
- this.executorContext = context;
- this.executorsBolt = context.getExecutorsBolt();
- this.pipelineOptions = this.serializedOptions.getPipelineOptions().as(StormPipelineOptions.class);
-
- initService(context);
-
- // Side inputs setup
- if (sideInputs != null && sideInputs.isEmpty() == false) {
- pushedBackTag = StateTags.bag("pushed-back-values", inputCoder);
- watermarkHoldTag =
- StateTags.watermarkStateInternal("hold", TimestampCombiner.EARLIEST);
- pushbackStateInternals = new JStormStateInternals(null, kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId);
- sideInputHandler = new SideInputHandler(sideInputs, pushbackStateInternals);
- runner = getDoFnRunner();
- pushbackRunner = SimplePushbackSideInputDoFnRunner.create(runner, sideInputs, sideInputHandler);
- } else {
- runner = getDoFnRunner();
- }
-
- // Process user's setup
- doFnInvoker = DoFnInvokers.invokerFor(doFn);
- doFnInvoker.invokeSetup();
- }
-
- @Override
- public <T> void process(TupleTag<T> tag, WindowedValue<T> elem) {
- LOG.debug(String.format("process: elemTag=%s, mainInputTag=%s, sideInputs=%s, elem={}",
- tag, mainInputTag, sideInputs, elem.getValue()));
- if (mainInputTag.equals(tag)) {
- processMainInput(elem);
- } else {
- processSideInput(tag, elem);
- }
- }
-
- protected <T> void processMainInput(WindowedValue<T> elem) {
- if (sideInputs.isEmpty()) {
- runner.processElement((WindowedValue<InputT>) elem);
- } else {
- Iterable<WindowedValue<InputT>> justPushedBack =
- pushbackRunner.processElementInReadyWindows((WindowedValue<InputT>) elem);
- BagState<WindowedValue<InputT>> pushedBack =
- pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag);
-
- Instant min = BoundedWindow.TIMESTAMP_MAX_VALUE;
- for (WindowedValue<InputT> pushedBackValue : justPushedBack) {
- if (pushedBackValue.getTimestamp().isBefore(min)) {
- min = pushedBackValue.getTimestamp();
- }
- min = earlier(min, pushedBackValue.getTimestamp());
- pushedBack.add(pushedBackValue);
- }
- pushbackStateInternals.state(StateNamespaces.global(), watermarkHoldTag).add(min);
- }
- }
-
- protected void processSideInput(TupleTag tag, WindowedValue elem) {
- LOG.debug(String.format("side inputs: %s, %s.", tag, elem));
-
- PCollectionView<?> sideInputView = sideInputTagToView.get(tag);
- sideInputHandler.addSideInputValue(sideInputView, elem);
-
- BagState<WindowedValue<InputT>> pushedBack =
- pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag);
-
- List<WindowedValue<InputT>> newPushedBack = new ArrayList<>();
-
- Iterable<WindowedValue<InputT>> pushedBackInputs = pushedBack.read();
- if (pushedBackInputs != null) {
- for (WindowedValue<InputT> input : pushedBackInputs) {
-
- Iterable<WindowedValue<InputT>> justPushedBack =
- pushbackRunner.processElementInReadyWindows(input);
- Iterables.addAll(newPushedBack, justPushedBack);
- }
- }
- pushedBack.clear();
-
- Instant min = BoundedWindow.TIMESTAMP_MAX_VALUE;
- for (WindowedValue<InputT> pushedBackValue : newPushedBack) {
- min = earlier(min, pushedBackValue.getTimestamp());
- pushedBack.add(pushedBackValue);
- }
-
- WatermarkHoldState watermarkHold =
- pushbackStateInternals.state(StateNamespaces.global(), watermarkHoldTag);
- // TODO: clear-then-add is not thread-safe.
- watermarkHold.clear();
- watermarkHold.add(min);
- }
-
- /**
- * Process all pushed back elements when receiving watermark with max timestamp
- */
- public void processAllPushBackElements() {
- if (sideInputs != null && sideInputs.isEmpty() == false) {
- BagState<WindowedValue<InputT>> pushedBackElements =
- pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag);
- if (pushedBackElements != null) {
- for (WindowedValue<InputT> elem : pushedBackElements.read()) {
- LOG.info("Process pushback elem={}", elem);
- runner.processElement(elem);
- }
- pushedBackElements.clear();
- }
-
- WatermarkHoldState watermarkHold =
- pushbackStateInternals.state(StateNamespaces.global(), watermarkHoldTag);
- watermarkHold.clear();
- watermarkHold.add(BoundedWindow.TIMESTAMP_MAX_VALUE);
- }
- }
-
- public void onTimer(Object key, TimerInternals.TimerData timerData) {
- StateNamespace namespace = timerData.getNamespace();
- checkArgument(namespace instanceof StateNamespaces.WindowNamespace);
- BoundedWindow window = ((StateNamespaces.WindowNamespace) namespace).getWindow();
- if (pushbackRunner != null) {
- pushbackRunner.onTimer(timerData.getTimerId(), window, timerData.getTimestamp(), timerData.getDomain());
- } else {
- runner.onTimer(timerData.getTimerId(), window, timerData.getTimestamp(), timerData.getDomain());
- }
- }
-
- @Override
- public void cleanup() {
- doFnInvoker.invokeTeardown();
- }
-
- @Override
- public String toString() {
- return description;
- }
-
- private Instant earlier(Instant left, Instant right) {
- return left.isBefore(right) ? left : right;
- }
-
- public void startBundle() {
- if (pushbackRunner != null) {
- pushbackRunner.startBundle();
- } else {
- runner.startBundle();
- }
- }
-
- public void finishBundle() {
- if (pushbackRunner != null) {
- pushbackRunner.finishBundle();
- } else {
- runner.finishBundle();
- }
- }
-
- public void setInternalDoFnExecutorId(int id) {
- this.internalDoFnExecutorId = id;
- }
-
- public int getInternalDoFnExecutorId() {
- return internalDoFnExecutorId;
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/DoFnRunnerWithMetrics.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/DoFnRunnerWithMetrics.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/DoFnRunnerWithMetrics.java
deleted file mode 100644
index 105dffb..0000000
--- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/DoFnRunnerWithMetrics.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.beam.translation.runtime;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import java.io.Closeable;
-import java.io.IOException;
-import org.apache.beam.runners.core.DoFnRunner;
-import org.apache.beam.sdk.metrics.MetricsContainer;
-import org.apache.beam.sdk.metrics.MetricsEnvironment;
-import org.apache.beam.sdk.state.TimeDomain;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.joda.time.Instant;
-
-/**
- * DoFnRunner decorator which registers {@link MetricsContainer}.
- */
-public class DoFnRunnerWithMetrics<InputT, OutputT> implements DoFnRunner<InputT, OutputT> {
-
- private final String stepName;
- private final DoFnRunner<InputT, OutputT> delegate;
- private final MetricsReporter metricsReporter;
-
- DoFnRunnerWithMetrics(
- String stepName,
- DoFnRunner<InputT, OutputT> delegate,
- MetricsReporter metricsReporter) {
- this.stepName = checkNotNull(stepName, "stepName");
- this.delegate = checkNotNull(delegate, "delegate");
- this.metricsReporter = checkNotNull(metricsReporter, "metricsReporter");
- }
-
- @Override
- public void startBundle() {
- try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer(
- metricsReporter.getMetricsContainer(stepName))) {
- delegate.startBundle();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public void processElement(WindowedValue<InputT> elem) {
- try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer(
- metricsReporter.getMetricsContainer(stepName))) {
- delegate.processElement(elem);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public void onTimer(String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain) {
- try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer(
- metricsReporter.getMetricsContainer(stepName))) {
- delegate.onTimer(timerId, window, timestamp, timeDomain);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public void finishBundle() {
- try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer(
- metricsReporter.getMetricsContainer(stepName))) {
- delegate.finishBundle();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- metricsReporter.updateMetrics();
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/Executor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/Executor.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/Executor.java
deleted file mode 100644
index 30348b2..0000000
--- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/Executor.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.beam.translation.runtime;
-
-import java.io.Serializable;
-import java.util.Map;
-
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.TupleTag;
-
-import com.alibaba.jstorm.utils.Pair;
-
-public interface Executor extends Serializable {
- /**
- * Initialization during runtime
- */
- void init(ExecutorContext context);
-
- <T> void process(TupleTag<T> tag, WindowedValue<T> elem);
-
- void cleanup();
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/ExecutorContext.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/ExecutorContext.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/ExecutorContext.java
deleted file mode 100644
index 7f9aa77..0000000
--- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/ExecutorContext.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.beam.translation.runtime;
-
-import backtype.storm.task.TopologyContext;
-import com.alibaba.jstorm.cache.IKvStoreManager;
-import com.google.auto.value.AutoValue;
-
-@AutoValue
-public abstract class ExecutorContext {
- public static ExecutorContext of(TopologyContext topologyContext, ExecutorsBolt bolt, IKvStoreManager kvStoreManager) {
- return new AutoValue_ExecutorContext(topologyContext, bolt, kvStoreManager);
- }
-
- public abstract TopologyContext getTopologyContext();
-
- public abstract ExecutorsBolt getExecutorsBolt();
-
- public abstract IKvStoreManager getKvStoreManager();
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/ExecutorsBolt.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/ExecutorsBolt.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/ExecutorsBolt.java
deleted file mode 100644
index ebd9456..0000000
--- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/ExecutorsBolt.java
+++ /dev/null
@@ -1,332 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.beam.translation.runtime;
-
-import java.io.IOException;
-import java.util.*;
-
-import avro.shaded.com.google.common.base.Joiner;
-import avro.shaded.com.google.common.collect.Sets;
-import backtype.storm.tuple.ITupleExt;
-import backtype.storm.tuple.TupleImplExt;
-import com.alibaba.jstorm.beam.translation.util.CommonInstance;
-import com.alibaba.jstorm.cache.IKvStoreManager;
-import com.alibaba.jstorm.cache.KvStoreIterable;
-import com.alibaba.jstorm.cache.KvStoreManagerFactory;
-import com.alibaba.jstorm.cluster.Common;
-import com.alibaba.jstorm.utils.KryoSerializer;
-import com.alibaba.jstorm.window.Watermark;
-import com.google.common.base.Function;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Maps;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.TupleTag;
-import org.joda.time.Instant;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-public class ExecutorsBolt extends AdaptorBasicBolt {
- private static final long serialVersionUID = -7751043327801735211L;
-
- private static final Logger LOG = LoggerFactory.getLogger(ExecutorsBolt.class);
-
- protected ExecutorContext executorContext;
-
- protected TimerService timerService;
-
- // map from input tag to executor inside bolt
- protected final Map<TupleTag, Executor> inputTagToExecutor = Maps.newHashMap();
- // set of all output tags that will be emit outside bolt
- protected final Set<TupleTag> outputTags = Sets.newHashSet();
- protected final Set<TupleTag> externalOutputTags = Sets.newHashSet();
- protected final Set<DoFnExecutor> doFnExecutors = Sets.newHashSet();
- protected int internalDoFnExecutorId = 1;
- protected final Map<Integer, DoFnExecutor> idToDoFnExecutor = Maps.newHashMap();
-
- protected OutputCollector collector;
-
- protected boolean isStatefulBolt = false;
-
- protected KryoSerializer<WindowedValue> serializer;
-
- public ExecutorsBolt() {
-
- }
-
- public void setStatefulBolt(boolean isStateful) {
- isStatefulBolt = isStateful;
- }
-
- public void addExecutor(TupleTag inputTag, Executor executor) {
- inputTagToExecutor.put(
- checkNotNull(inputTag, "inputTag"),
- checkNotNull(executor, "executor"));
- }
-
- public Map<TupleTag, Executor> getExecutors() {
- return inputTagToExecutor;
- }
-
- public void registerExecutor(Executor executor) {
- if (executor instanceof DoFnExecutor) {
- DoFnExecutor doFnExecutor = (DoFnExecutor) executor;
- idToDoFnExecutor.put(internalDoFnExecutorId, doFnExecutor);
- doFnExecutor.setInternalDoFnExecutorId(internalDoFnExecutorId);
- internalDoFnExecutorId++;
- }
- }
-
- public Map<Integer, DoFnExecutor> getIdToDoFnExecutor() {
- return idToDoFnExecutor;
- }
-
- public void addOutputTags(TupleTag tag) {
- outputTags.add(tag);
- }
-
- public void addExternalOutputTag(TupleTag<?> tag) {
- externalOutputTags.add(tag);
- }
-
- public Set<TupleTag> getOutputTags() {
- return outputTags;
- }
-
- public ExecutorContext getExecutorContext() {
- return executorContext;
- }
-
- @Override
- public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
- LOG.info("Start to prepare for task-{}", context.getThisTaskId());
- try {
- this.collector = collector;
-
- // init kv store manager
- String storeName = String.format("task-%d", context.getThisTaskId());
- String stateStorePath = String.format("%s/beam/%s", context.getWorkerIdDir(), storeName);
- IKvStoreManager kvStoreManager = isStatefulBolt ? KvStoreManagerFactory.getKvStoreManagerWithMonitor(context, storeName, stateStorePath, isStatefulBolt) :
- KvStoreManagerFactory.getKvStoreManager(stormConf, storeName, stateStorePath, isStatefulBolt);
- this.executorContext = ExecutorContext.of(context, this, kvStoreManager);
-
- // init time service
- timerService = initTimerService();
-
- // init all internal executors
- for (Executor executor : Sets.newHashSet(inputTagToExecutor.values())) {
- executor.init(executorContext);
- if (executor instanceof DoFnExecutor) {
- doFnExecutors.add((DoFnExecutor) executor);
- }
- }
-
- this.serializer = new KryoSerializer<WindowedValue>(stormConf);
-
- LOG.info("ExecutorsBolt finished init. LocalExecutors={}", inputTagToExecutor.values());
- LOG.info("inputTagToExecutor={}", inputTagToExecutor);
- LOG.info("outputTags={}", outputTags);
- LOG.info("externalOutputTags={}", externalOutputTags);
- LOG.info("doFnExecutors={}", doFnExecutors);
- } catch (IOException e) {
- throw new RuntimeException("Failed to prepare executors bolt", e);
- }
- }
-
- public TimerService initTimerService() {
- TopologyContext context = executorContext.getTopologyContext();
- List<Integer> tasks = FluentIterable.from(context.getThisSourceComponentTasks().entrySet())
- .transformAndConcat(
- new Function<Map.Entry<String, List<Integer>>, Iterable<Integer>>() {
- @Override
- public Iterable<Integer> apply(Map.Entry<String, List<Integer>> value) {
- if (Common.isSystemComponent(value.getKey())) {
- return Collections.EMPTY_LIST;
- } else {
- return value.getValue();
- }
- }
- })
- .toList();
- TimerService ret = new TimerServiceImpl(executorContext);
- ret.init(tasks);
- return ret;
- }
-
- @Override
- public void execute(Tuple input) {
- // process a batch
- String streamId = input.getSourceStreamId();
- ITupleExt tuple = (ITupleExt) input;
- Iterator<List<Object>> valueIterator = tuple.batchValues().iterator();
- if (CommonInstance.BEAM_WATERMARK_STREAM_ID.equals(streamId)) {
- while (valueIterator.hasNext()) {
- processWatermark((Long) valueIterator.next().get(0), input.getSourceTask());
- }
- } else {
- doFnStartBundle();
- while (valueIterator.hasNext()) {
- processElement(valueIterator.next(), streamId);
- }
- doFnFinishBundle();
- }
- }
-
- private void processWatermark(long watermarkTs, int sourceTask) {
- long newWaterMark = timerService.updateInputWatermark(sourceTask, watermarkTs);
- LOG.debug("Recv waterMark-{} from task-{}, newWaterMark={}",
- (new Instant(watermarkTs)).toDateTime(), sourceTask, (new Instant(newWaterMark)).toDateTime());
- if (newWaterMark != 0) {
- // Some buffer windows are going to be triggered.
- doFnStartBundle();
- timerService.fireTimers(newWaterMark);
-
- // SideInput: If receiving water mark with max timestamp, It means no more data is supposed
- // to be received from now on. So we are going to process all push back data.
- if (newWaterMark == BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
- for (DoFnExecutor doFnExecutor : doFnExecutors) {
- doFnExecutor.processAllPushBackElements();
- }
- }
-
- doFnFinishBundle();
- }
-
- long currentWaterMark = timerService.currentOutputWatermark();
- if (!externalOutputTags.isEmpty()) {
- collector.flush();
- collector.emit(
- CommonInstance.BEAM_WATERMARK_STREAM_ID,
- new Values(currentWaterMark));
- LOG.debug("Send waterMark-{}", (new Instant(currentWaterMark)).toDateTime());
- }
- }
-
- private void processElement(List<Object> values, String streamId) {
- TupleTag inputTag = new TupleTag(streamId);
- WindowedValue windowedValue = retrieveWindowedValueFromTupleValue(values);
- processExecutorElem(inputTag, windowedValue);
- }
-
- public <T> void processExecutorElem(TupleTag<T> inputTag, WindowedValue<T> elem) {
- LOG.debug("ProcessExecutorElem: inputTag={}, value={}", inputTag, elem.getValue());
- if (elem != null) {
- Executor executor = inputTagToExecutor.get(inputTag);
- if (executor != null) {
- executor.process(inputTag, elem);
- }
- if (externalOutputTags.contains(inputTag)) {
- emitOutsideBolt(inputTag, elem);
- }
- } else {
- LOG.info("Received null elem for tag={}", inputTag);
- }
- }
-
- @Override
- public void cleanup() {
- for (Executor executor : Sets.newHashSet(inputTagToExecutor.values())) {
- executor.cleanup();
- }
- executorContext.getKvStoreManager().close();
- }
-
- @Override
- public Map<String, Object> getComponentConfiguration() {
- return null;
- }
-
- public TimerService timerService() {
- return timerService;
- }
-
- public void setTimerService(TimerService service) {
- timerService = service;
- }
-
- private WindowedValue retrieveWindowedValueFromTupleValue(List<Object> values) {
- WindowedValue wv = null;
- if (values.size() > 1) {
- Object key = values.get(0);
- WindowedValue value = serializer.deserialize((byte[]) values.get(1));
- wv = value.withValue(KV.of(key, value.getValue()));
- } else {
- wv = serializer.deserialize((byte[])values.get(0));
- }
- return wv;
- }
-
- protected void emitOutsideBolt(TupleTag outputTag, WindowedValue outputValue) {
- LOG.debug("Output outside: tag={}, value={}", outputTag, outputValue.getValue());
- if (keyedEmit(outputTag.getId())) {
- KV kv = (KV) outputValue.getValue();
- byte[] immutableOutputValue = serializer.serialize(outputValue.withValue(kv.getValue()));
- // Convert WindowedValue<KV> to <K, WindowedValue<V>>
- if (kv.getKey() == null) {
- // If key is null, emit "null" string here. Because, null value will be ignored in JStorm.
- collector.emit(outputTag.getId(), new Values("null", immutableOutputValue));
- } else {
- collector.emit(outputTag.getId(), new Values(kv.getKey(), immutableOutputValue));
- }
- } else {
- byte[] immutableOutputValue = serializer.serialize(outputValue);
- collector.emit(outputTag.getId(), new Values(immutableOutputValue));
- }
- }
-
- private void doFnStartBundle() {
- for (DoFnExecutor doFnExecutor : doFnExecutors) {
- doFnExecutor.startBundle();
- }
- }
-
- private void doFnFinishBundle() {
- for (DoFnExecutor doFnExecutor : doFnExecutors) {
- doFnExecutor.finishBundle();
- }
- }
-
- @Override
- public String toString() {
- // LOG.info("bolt: " + executorContext.getTopologyContext().toJSONString());
- List<String> ret = new ArrayList<>();
- /*ret.add("inputTags");
- for (TupleTag inputTag : inputTagToExecutor.keySet()) {
- ret.add(inputTag.getId());
- }*/
- ret.add("internalExecutors");
- for (Executor executor : inputTagToExecutor.values()) {
- ret.add(executor.toString());
- }
- ret.add("externalOutputTags");
- for (TupleTag output : externalOutputTags) {
- ret.add(output.getId());
- }
- return Joiner.on('\n').join(ret).concat("\n");
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/FlattenExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/FlattenExecutor.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/FlattenExecutor.java
deleted file mode 100644
index 7158b2f..0000000
--- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/FlattenExecutor.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.beam.translation.runtime;
-
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.TupleTag;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-public class FlattenExecutor<InputT> implements Executor {
-
- private final String description;
- private TupleTag mainOutputTag;
- private ExecutorContext context;
- private ExecutorsBolt executorsBolt;
-
- public FlattenExecutor(String description, TupleTag mainTupleTag) {
- this.description = checkNotNull(description, "description");
- this.mainOutputTag = mainTupleTag;
- }
-
- @Override
- public void init(ExecutorContext context) {
- this.context = context;
- this.executorsBolt = context.getExecutorsBolt();
- }
-
- @Override
- public void process(TupleTag tag, WindowedValue elem) {
- executorsBolt.processExecutorElem(mainOutputTag, elem);
- }
-
- @Override
- public void cleanup() {}
-
- @Override
- public String toString() {
- return description;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/GroupByWindowExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/GroupByWindowExecutor.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/GroupByWindowExecutor.java
deleted file mode 100644
index 1958c77..0000000
--- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/GroupByWindowExecutor.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.beam.translation.runtime;
-
-import java.io.Serializable;
-import java.util.List;
-
-import com.alibaba.jstorm.beam.translation.runtime.state.JStormStateInternals;
-import com.alibaba.jstorm.beam.translation.runtime.timer.JStormTimerInternals;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import org.apache.beam.runners.core.DoFnRunner;
-import org.apache.beam.runners.core.DoFnRunners;
-import org.apache.beam.runners.core.ExecutionContext.StepContext;
-import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn;
-import org.apache.beam.runners.core.KeyedWorkItem;
-import org.apache.beam.runners.core.KeyedWorkItems;
-import org.apache.beam.runners.core.StateInternals;
-import org.apache.beam.runners.core.StateInternalsFactory;
-import org.apache.beam.runners.core.StateNamespace;
-import org.apache.beam.runners.core.StateNamespaces;
-import org.apache.beam.runners.core.SystemReduceFn;
-import org.apache.beam.runners.core.TimerInternals;
-import org.apache.beam.runners.core.TimerInternalsFactory;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.runners.core.NullSideInputReader;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.WindowingStrategy;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.TupleTag;
-
-import com.alibaba.jstorm.beam.StormPipelineOptions;
-import com.alibaba.jstorm.beam.translation.TranslationContext;
-import com.alibaba.jstorm.beam.translation.TranslationContext.UserGraphContext;
-import com.alibaba.jstorm.beam.translation.util.DefaultStepContext;
-import com.alibaba.jstorm.beam.util.RunnerUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-public class GroupByWindowExecutor<K, V> extends DoFnExecutor<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> {
- private static final long serialVersionUID = -7563050475488610553L;
-
- private static final Logger LOG = LoggerFactory.getLogger(GroupByWindowExecutor.class);
-
- private class GroupByWindowOutputManager implements DoFnRunners.OutputManager, Serializable {
-
- @Override
- public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
- executorsBolt.processExecutorElem(tag, output);
- }
- }
-
- private KvCoder<K, V> inputKvCoder;
- private SystemReduceFn<K, V, Iterable<V>, Iterable<V>, BoundedWindow> reduceFn;
-
- public GroupByWindowExecutor(
- String stepName,
- String description,
- TranslationContext context,
- StormPipelineOptions pipelineOptions,
- WindowingStrategy<?, ?> windowingStrategy,
- TupleTag<KV<K, Iterable<V>>> mainTupleTag, List<TupleTag<?>> sideOutputTags) {
- // The doFn will be created when runtime. Just pass "null" here
- super(stepName, description, pipelineOptions, null, null, windowingStrategy, null, null, null, mainTupleTag, sideOutputTags);
-
- this.outputManager = new GroupByWindowOutputManager();
- UserGraphContext userGraphContext = context.getUserGraphContext();
- PCollection<KV<K, V>> input = (PCollection<KV<K, V>>) userGraphContext.getInput();
- this.inputKvCoder = (KvCoder<K, V>) input.getCoder();
- }
-
- private DoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> getGroupByWindowDoFn() {
- final StateInternalsFactory<K> stateFactory = new StateInternalsFactory<K>() {
- @Override
- public StateInternals stateInternalsForKey(K key) {
- return new JStormStateInternals<K>(key, kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId);
- }
- };
- TimerInternalsFactory<K> timerFactory = new TimerInternalsFactory<K>() {
- @Override
- public TimerInternals timerInternalsForKey(K key) {
- return new JStormTimerInternals<>(key, GroupByWindowExecutor.this, executorContext.getExecutorsBolt().timerService());
- }
- };
-
- reduceFn = SystemReduceFn.buffering(inputKvCoder.getValueCoder());
- DoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> doFn =
- GroupAlsoByWindowViaWindowSetNewDoFn.create(
- windowingStrategy, stateFactory, timerFactory, NullSideInputReader.empty(),
- (SystemReduceFn) reduceFn, outputManager, mainTupleTag);
- return doFn;
- }
-
- @Override
- protected DoFnRunner<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> getDoFnRunner() {
- doFn = getGroupByWindowDoFn();
-
- DoFnRunner<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> simpleRunner = DoFnRunners.<KeyedWorkItem<K, V>, KV<K, Iterable<V>>>simpleRunner(
- this.pipelineOptions,
- this.doFn,
- NullSideInputReader.empty(),
- this.outputManager,
- this.mainTupleTag,
- this.sideOutputTags,
- this.stepContext,
- this.windowingStrategy);
-
- DoFnRunner<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> doFnRunner = DoFnRunners.lateDataDroppingRunner(
- simpleRunner,
- this.stepContext,
- this.windowingStrategy);
- return new DoFnRunnerWithMetrics<>(
- stepName, doFnRunner, MetricsReporter.create(metricClient));
- }
-
- @Override
- public void process(TupleTag tag, WindowedValue elem) {
- /**
- * For GroupByKey, KV type elem is received. We need to convert the KV elem
- * into KeyedWorkItem first, which is the expected type in LateDataDroppingDoFnRunner.
- */
- KeyedWorkItem<K, V> keyedWorkItem = RunnerUtils.toKeyedWorkItem((WindowedValue<KV<K, V>>) elem);
- runner.processElement(elem.withValue(keyedWorkItem));
- }
-
- @Override
- public void onTimer(Object key, TimerInternals.TimerData timerData) {
- StateNamespace namespace = timerData.getNamespace();
- checkArgument(namespace instanceof StateNamespaces.WindowNamespace);
-
- runner.processElement(
- WindowedValue.valueInGlobalWindow(
- KeyedWorkItems.<K, V>timersWorkItem((K) key, ImmutableList.of(timerData))));
- }
-
- @Override
- public String toString() {
- return super.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/MetricsReporter.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/MetricsReporter.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/MetricsReporter.java
deleted file mode 100644
index 33095b1..0000000
--- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/MetricsReporter.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.beam.translation.runtime;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-import static org.apache.beam.runners.core.metrics.MetricsContainerStepMap.asAttemptedOnlyMetricResults;
-
-import com.alibaba.jstorm.common.metric.AsmCounter;
-import com.alibaba.jstorm.common.metric.AsmGauge;
-import com.alibaba.jstorm.common.metric.AsmHistogram;
-import com.alibaba.jstorm.common.metric.AsmMeter;
-import com.alibaba.jstorm.common.metric.AsmMetric;
-import com.alibaba.jstorm.metric.MetricClient;
-import com.google.common.collect.Maps;
-import java.util.Map;
-import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
-import org.apache.beam.sdk.metrics.DistributionResult;
-import org.apache.beam.sdk.metrics.GaugeResult;
-import org.apache.beam.sdk.metrics.MetricQueryResults;
-import org.apache.beam.sdk.metrics.MetricResult;
-import org.apache.beam.sdk.metrics.MetricResults;
-import org.apache.beam.sdk.metrics.MetricsContainer;
-import org.apache.beam.sdk.metrics.MetricsFilter;
-
-/**
- * Class that holds a {@link MetricsContainerStepMap}, and reports metrics to JStorm engine.
- */
-public class MetricsReporter {
-
- private static final String METRIC_KEY_SEPARATOR = "__";
- private static final String COUNTER_PREFIX = "__counter";
-
- private final MetricsContainerStepMap metricsContainers = new MetricsContainerStepMap();
- private final Map<String, Long> reportedCounters = Maps.newHashMap();
- private final MetricClient metricClient;
-
- public static MetricsReporter create(MetricClient metricClient) {
- return new MetricsReporter(metricClient);
- }
-
- private MetricsReporter(MetricClient metricClient) {
- this.metricClient = checkNotNull(metricClient, "metricClient");
- }
-
- public MetricsContainer getMetricsContainer(String stepName) {
- return metricsContainers.getContainer(stepName);
- }
-
- public void updateMetrics() {
- MetricResults metricResults = asAttemptedOnlyMetricResults(metricsContainers);
- MetricQueryResults metricQueryResults =
- metricResults.queryMetrics(MetricsFilter.builder().build());
- updateCounters(metricQueryResults.counters());
- }
-
- private void updateCounters(Iterable<MetricResult<Long>> counters) {
- System.out.print("updateCounters");
- for (MetricResult<Long> metricResult : counters) {
- String metricName = getMetricNameString(COUNTER_PREFIX, metricResult);
- System.out.print("metricName: " + metricName);
- Long updateValue = metricResult.attempted();
- Long oldValue = reportedCounters.get(metricName);
-
- if (oldValue == null || oldValue < updateValue) {
- AsmCounter counter = metricClient.registerCounter(metricName);
- Long incValue = (oldValue == null ? updateValue : updateValue - oldValue);
- counter.update(incValue);
- }
- }
- }
-
- private String getMetricNameString(String prefix, MetricResult<?> metricResult) {
- return prefix
- + METRIC_KEY_SEPARATOR + metricResult.step()
- + METRIC_KEY_SEPARATOR + metricResult.name().namespace()
- + METRIC_KEY_SEPARATOR + metricResult.name().name();
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/MultiOutputDoFnExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/MultiOutputDoFnExecutor.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/MultiOutputDoFnExecutor.java
deleted file mode 100644
index bd3dfb3..0000000
--- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/MultiOutputDoFnExecutor.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.beam.translation.runtime;
-
-import com.alibaba.jstorm.beam.StormPipelineOptions;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.WindowingStrategy;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-
-public class MultiOutputDoFnExecutor<InputT, OutputT> extends DoFnExecutor<InputT, OutputT> {
- private static final Logger LOG = LoggerFactory.getLogger(MultiOutputDoFnExecutor.class);
-
- /**
- * For multi-output scenario,a "local" tuple tag is used in producer currently while a generated tag
- * is used in downstream consumer. So before output, we need to map this "local" tag to "external"
- * tag. See PCollectionTuple for details.
- */
- public class MultiOutputDoFnExecutorOutputManager extends DoFnExecutorOutputManager {
- @Override
- public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
- if (localTupleTagMap.containsKey(tag)) {
- executorsBolt.processExecutorElem((TupleTag<T>) localTupleTagMap.get(tag), output);
- } else {
- executorsBolt.processExecutorElem(tag, output);
- }
- }
- }
-
- protected Map<TupleTag<?>, TupleTag<?>> localTupleTagMap;
-
- public MultiOutputDoFnExecutor(
- String stepName,
- String description,
- StormPipelineOptions pipelineOptions,
- DoFn<InputT, OutputT> doFn,
- Coder<WindowedValue<InputT>> inputCoder,
- WindowingStrategy<?, ?> windowingStrategy,
- TupleTag<InputT> mainInputTag,
- Collection<PCollectionView<?>> sideInputs,
- Map<TupleTag, PCollectionView<?>> sideInputTagToView,
- TupleTag<OutputT> mainTupleTag,
- List<TupleTag<?>> sideOutputTags,
- Map<TupleTag<?>, TupleTag<?>> localTupleTagMap
- ) {
- super(stepName, description, pipelineOptions, doFn, inputCoder, windowingStrategy, mainInputTag,
- sideInputs, sideInputTagToView, mainTupleTag, sideOutputTags);
- this.localTupleTagMap = localTupleTagMap;
- this.outputManager = new MultiOutputDoFnExecutorOutputManager();
- LOG.info("localTupleTagMap: {}", localTupleTagMap);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/MultiStatefulDoFnExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/MultiStatefulDoFnExecutor.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/MultiStatefulDoFnExecutor.java
deleted file mode 100644
index 51aa960..0000000
--- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/MultiStatefulDoFnExecutor.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.beam.translation.runtime;
-
-import com.alibaba.jstorm.beam.StormPipelineOptions;
-import com.alibaba.jstorm.beam.translation.runtime.state.JStormStateInternals;
-import com.alibaba.jstorm.beam.translation.runtime.timer.JStormTimerInternals;
-import org.apache.beam.runners.core.TimerInternals;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.WindowingStrategy;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-
-public class MultiStatefulDoFnExecutor<OutputT> extends MultiOutputDoFnExecutor<KV, OutputT> {
-
- public MultiStatefulDoFnExecutor(
- String stepName, String description,
- StormPipelineOptions pipelineOptions, DoFn<KV, OutputT> doFn,
- Coder<WindowedValue<KV>> inputCoder, WindowingStrategy<?, ?> windowingStrategy,
- TupleTag<KV> mainInputTag, Collection<PCollectionView<?>> sideInputs,
- Map<TupleTag, PCollectionView<?>> sideInputTagToView, TupleTag<OutputT> mainTupleTag,
- List<TupleTag<?>> sideOutputTags, Map<TupleTag<?>, TupleTag<?>> localTupleTagMap) {
- super(stepName, description, pipelineOptions, doFn, inputCoder, windowingStrategy, mainInputTag, sideInputs, sideInputTagToView, mainTupleTag, sideOutputTags, localTupleTagMap);
- }
-
- @Override
- public <T> void process(TupleTag<T> tag, WindowedValue<T> elem) {
- if (mainInputTag.equals(tag)) {
- WindowedValue<KV> kvElem = (WindowedValue<KV>) elem;
- stepContext.setTimerInternals(new JStormTimerInternals(kvElem.getValue().getKey(), this,
- executorContext.getExecutorsBolt().timerService()));
- stepContext.setStateInternals(new JStormStateInternals<>(kvElem.getValue().getKey(),
- kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId));
- processMainInput(elem);
- } else {
- processSideInput(tag, elem);
- }
- }
-
- @Override
- public void onTimer(Object key, TimerInternals.TimerData timerData) {
- stepContext.setStateInternals(new JStormStateInternals<>(key,
- kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId));
- super.onTimer(key, timerData);
- }
-}