You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pe...@apache.org on 2017/08/20 15:03:22 UTC
[22/53] [abbrv] beam git commit: jstorm-runner: move most classes to
translation package and reduece their visibility to package private.
http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiOutputDoFnExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiOutputDoFnExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiOutputDoFnExecutor.java
deleted file mode 100644
index a26472c..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiOutputDoFnExecutor.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation.runtime;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import org.apache.beam.runners.jstorm.JStormPipelineOptions;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.WindowingStrategy;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * JStorm {@link Executor} for {@link DoFn} with multi-output.
- * @param <InputT>
- * @param <OutputT>
- */
-public class MultiOutputDoFnExecutor<InputT, OutputT> extends DoFnExecutor<InputT, OutputT> {
- private static final Logger LOG = LoggerFactory.getLogger(MultiOutputDoFnExecutor.class);
-
- /**
- * For multi-output scenario,a "local" tuple tag is used in producer currently while a generated
- * tag is used in downstream consumer. So before output, we need to map this "local" tag to
- * "external" tag. See PCollectionTuple for details.
- */
- public class MultiOutputDoFnExecutorOutputManager extends DoFnExecutorOutputManager {
- @Override
- public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
- if (localTupleTagMap.containsKey(tag)) {
- executorsBolt.processExecutorElem((TupleTag<T>) localTupleTagMap.get(tag), output);
- } else {
- executorsBolt.processExecutorElem(tag, output);
- }
- }
- }
-
- protected Map<TupleTag<?>, TupleTag<?>> localTupleTagMap;
-
- public MultiOutputDoFnExecutor(
- String stepName,
- String description,
- JStormPipelineOptions pipelineOptions,
- DoFn<InputT, OutputT> doFn,
- Coder<WindowedValue<InputT>> inputCoder,
- WindowingStrategy<?, ?> windowingStrategy,
- TupleTag<InputT> mainInputTag,
- Collection<PCollectionView<?>> sideInputs,
- Map<TupleTag, PCollectionView<?>> sideInputTagToView,
- TupleTag<OutputT> mainTupleTag,
- List<TupleTag<?>> sideOutputTags,
- Map<TupleTag<?>, TupleTag<?>> localTupleTagMap
- ) {
- super(stepName, description, pipelineOptions, doFn, inputCoder, windowingStrategy, mainInputTag,
- sideInputs, sideInputTagToView, mainTupleTag, sideOutputTags);
- this.localTupleTagMap = localTupleTagMap;
- this.outputManager = new MultiOutputDoFnExecutorOutputManager();
- LOG.info("localTupleTagMap: {}", localTupleTagMap);
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiStatefulDoFnExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiStatefulDoFnExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiStatefulDoFnExecutor.java
deleted file mode 100644
index 5e87cff..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiStatefulDoFnExecutor.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation.runtime;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import org.apache.beam.runners.core.TimerInternals;
-import org.apache.beam.runners.jstorm.JStormPipelineOptions;
-import org.apache.beam.runners.jstorm.translation.runtime.state.JStormStateInternals;
-import org.apache.beam.runners.jstorm.translation.runtime.timer.JStormTimerInternals;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.WindowingStrategy;
-
-/**
- * JStorm {@link Executor} for stateful {@link DoFn} with multi-output.
- * @param <OutputT>
- */
-public class MultiStatefulDoFnExecutor<OutputT> extends MultiOutputDoFnExecutor<KV, OutputT> {
-
- public MultiStatefulDoFnExecutor(
- String stepName, String description,
- JStormPipelineOptions pipelineOptions, DoFn<KV, OutputT> doFn,
- Coder<WindowedValue<KV>> inputCoder, WindowingStrategy<?, ?> windowingStrategy,
- TupleTag<KV> mainInputTag, Collection<PCollectionView<?>> sideInputs,
- Map<TupleTag, PCollectionView<?>> sideInputTagToView, TupleTag<OutputT> mainTupleTag,
- List<TupleTag<?>> sideOutputTags, Map<TupleTag<?>, TupleTag<?>> localTupleTagMap) {
- super(stepName, description, pipelineOptions, doFn, inputCoder, windowingStrategy, mainInputTag,
- sideInputs, sideInputTagToView, mainTupleTag, sideOutputTags, localTupleTagMap);
- }
-
- @Override
- public <T> void process(TupleTag<T> tag, WindowedValue<T> elem) {
- if (mainInputTag.equals(tag)) {
- WindowedValue<KV> kvElem = (WindowedValue<KV>) elem;
- stepContext.setTimerInternals(new JStormTimerInternals(kvElem.getValue().getKey(), this,
- executorContext.getExecutorsBolt().timerService()));
- stepContext.setStateInternals(new JStormStateInternals<>(kvElem.getValue().getKey(),
- kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId));
- processMainInput(elem);
- } else {
- processSideInput(tag, elem);
- }
- }
-
- @Override
- public void onTimer(Object key, TimerInternals.TimerData timerData) {
- stepContext.setStateInternals(new JStormStateInternals<>(key,
- kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId));
- super.onTimer(key, timerData);
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/StatefulDoFnExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/StatefulDoFnExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/StatefulDoFnExecutor.java
deleted file mode 100644
index 77ae844..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/StatefulDoFnExecutor.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation.runtime;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import org.apache.beam.runners.core.TimerInternals;
-import org.apache.beam.runners.jstorm.JStormPipelineOptions;
-import org.apache.beam.runners.jstorm.translation.runtime.state.JStormStateInternals;
-import org.apache.beam.runners.jstorm.translation.runtime.timer.JStormTimerInternals;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.WindowingStrategy;
-
-/**
- * JStorm {@link Executor} for stateful {@link DoFn}.
- * @param <OutputT>
- */
-public class StatefulDoFnExecutor<OutputT> extends DoFnExecutor<KV, OutputT> {
- public StatefulDoFnExecutor(
- String stepName, String description, JStormPipelineOptions pipelineOptions,
- DoFn<KV, OutputT> doFn, Coder<WindowedValue<KV>> inputCoder,
- WindowingStrategy<?, ?> windowingStrategy, TupleTag<KV> mainInputTag,
- Collection<PCollectionView<?>> sideInputs, Map<TupleTag, PCollectionView<?>>
- sideInputTagToView, TupleTag<OutputT> mainTupleTag, List<TupleTag<?>> sideOutputTags) {
- super(stepName, description, pipelineOptions, doFn, inputCoder, windowingStrategy,
- mainInputTag, sideInputs, sideInputTagToView, mainTupleTag, sideOutputTags);
- }
-
- @Override
- public <T> void process(TupleTag<T> tag, WindowedValue<T> elem) {
- if (mainInputTag.equals(tag)) {
- WindowedValue<KV> kvElem = (WindowedValue<KV>) elem;
- stepContext.setTimerInternals(new JStormTimerInternals(kvElem.getValue().getKey(), this,
- executorContext.getExecutorsBolt().timerService()));
- stepContext.setStateInternals(new JStormStateInternals<>(kvElem.getValue().getKey(),
- kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId));
- processMainInput(elem);
- } else {
- processSideInput(tag, elem);
- }
- }
-
- @Override
- public void onTimer(Object key, TimerInternals.TimerData timerData) {
- stepContext.setStateInternals(new JStormStateInternals<>(key,
- kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId));
- super.onTimer(key, timerData);
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TimerService.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TimerService.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TimerService.java
deleted file mode 100644
index 5c41bda..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TimerService.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation.runtime;
-
-import java.io.Serializable;
-import java.util.List;
-import org.apache.beam.runners.core.TimerInternals;
-import org.joda.time.Instant;
-
-/**
- * Interface that tracks input watermarks and manages timers in each bolt.
- */
-public interface TimerService extends Serializable {
-
- void init(List<Integer> upStreamTasks);
-
- /**
- *
- * @param task
- * @param inputWatermark
- * @return new watermark if any timer is triggered during the update of watermark, otherwise 0
- */
- long updateInputWatermark(Integer task, long inputWatermark);
-
- long currentInputWatermark();
-
- long currentOutputWatermark();
-
- void clearWatermarkHold(String namespace);
-
- void addWatermarkHold(String namespace, Instant watermarkHold);
-
- void setTimer(Object key, TimerInternals.TimerData timerData, DoFnExecutor doFnExecutor);
-
- void fireTimers(long newWatermark);
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TimerServiceImpl.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TimerServiceImpl.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TimerServiceImpl.java
deleted file mode 100644
index 0103095..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TimerServiceImpl.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation.runtime;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkState;
-
-import com.alibaba.jstorm.utils.Pair;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.PriorityQueue;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import org.apache.beam.runners.core.TimerInternals;
-import org.apache.beam.sdk.state.TimeDomain;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.joda.time.Instant;
-
-/**
- * Default implementation of {@link TimerService}.
- */
-public class TimerServiceImpl implements TimerService {
- private transient ExecutorContext executorContext;
- private transient Map<Integer, DoFnExecutor> idToDoFnExecutor;
-
- private final ConcurrentMap<Integer, Long> upStreamTaskToInputWatermark =
- new ConcurrentHashMap<>();
- private final PriorityQueue<Long> inputWatermarks = new PriorityQueue<>();
- private final PriorityQueue<Instant> watermarkHolds = new PriorityQueue<>();
- private final Map<String, Instant> namespaceToWatermarkHold = new HashMap<>();
- private final transient PriorityQueue<TimerInternals.TimerData> eventTimeTimersQueue =
- new PriorityQueue<>();
- private final Map<TimerInternals.TimerData, Set<Pair<Integer, Object>>>
- timerDataToKeyedExecutors = Maps.newHashMap();
-
- private boolean initialized = false;
-
- public TimerServiceImpl() {
- }
-
- public TimerServiceImpl(ExecutorContext executorContext) {
- this.executorContext = executorContext;
- this.idToDoFnExecutor = executorContext.getExecutorsBolt().getIdToDoFnExecutor();
- }
-
- @Override
- public void init(List<Integer> upStreamTasks) {
- for (Integer task : upStreamTasks) {
- upStreamTaskToInputWatermark.put(task, BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis());
- inputWatermarks.add(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis());
- }
- initialized = true;
- }
-
- @Override
- public synchronized long updateInputWatermark(Integer task, long taskInputWatermark) {
- checkState(initialized, "TimerService has not been initialized.");
- Long oldTaskInputWatermark = upStreamTaskToInputWatermark.get(task);
- // Make sure the input watermark don't go backward.
- if (taskInputWatermark > oldTaskInputWatermark) {
- upStreamTaskToInputWatermark.put(task, taskInputWatermark);
- inputWatermarks.add(taskInputWatermark);
- inputWatermarks.remove(oldTaskInputWatermark);
-
- long newLocalInputWatermark = currentInputWatermark();
- if (newLocalInputWatermark > oldTaskInputWatermark) {
- return newLocalInputWatermark;
- }
- }
- return 0;
- }
-
- @Override
- public void fireTimers(long newWatermark) {
- TimerInternals.TimerData timerData;
- while ((timerData = eventTimeTimersQueue.peek()) != null
- && timerData.getTimestamp().getMillis() <= newWatermark) {
- for (Pair<Integer, Object> keyedExecutor : timerDataToKeyedExecutors.get(timerData)) {
- DoFnExecutor executor = idToDoFnExecutor.get(keyedExecutor.getFirst());
- executor.onTimer(keyedExecutor.getSecond(), timerData);
- }
- eventTimeTimersQueue.remove();
- timerDataToKeyedExecutors.remove(timerData);
- }
- }
-
- @Override
- public long currentInputWatermark() {
- return initialized ? inputWatermarks.peek() : BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis();
- }
-
- @Override
- public long currentOutputWatermark() {
- if (watermarkHolds.isEmpty()) {
- return currentInputWatermark();
- } else {
- return Math.min(currentInputWatermark(), watermarkHolds.peek().getMillis());
- }
- }
-
- @Override
- public void clearWatermarkHold(String namespace) {
- Instant currentHold = namespaceToWatermarkHold.get(namespace);
- if (currentHold != null) {
- watermarkHolds.remove(currentHold);
- namespaceToWatermarkHold.remove(namespace);
- }
- }
-
- @Override
- public void addWatermarkHold(String namespace, Instant watermarkHold) {
- Instant currentHold = namespaceToWatermarkHold.get(namespace);
- if (currentHold == null) {
- namespaceToWatermarkHold.put(namespace, watermarkHold);
- watermarkHolds.add(watermarkHold);
- } else if (currentHold != null && watermarkHold.isBefore(currentHold)) {
- namespaceToWatermarkHold.put(namespace, watermarkHold);
- watermarkHolds.add(watermarkHold);
- watermarkHolds.remove(currentHold);
- }
- }
-
- @Override
- public void setTimer(Object key, TimerInternals.TimerData timerData, DoFnExecutor doFnExecutor) {
- checkArgument(
- TimeDomain.EVENT_TIME.equals(timerData.getDomain()),
- String.format("Does not support domain: %s.", timerData.getDomain()));
- Set<Pair<Integer, Object>> keyedExecutors = timerDataToKeyedExecutors.get(timerData);
- if (keyedExecutors == null) {
- keyedExecutors = Sets.newHashSet();
- eventTimeTimersQueue.add(timerData);
- }
- keyedExecutors.add(new Pair<>(doFnExecutor.getInternalDoFnExecutorId(), key));
- timerDataToKeyedExecutors.put(timerData, keyedExecutors);
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxExecutorsBolt.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxExecutorsBolt.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxExecutorsBolt.java
deleted file mode 100644
index 8dc51b5..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxExecutorsBolt.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation.runtime;
-
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Tuple;
-import com.alibaba.jstorm.cache.IKvStore;
-import com.alibaba.jstorm.cache.IKvStoreManager;
-import com.alibaba.jstorm.transactional.bolt.ITransactionStatefulBoltExecutor;
-import java.io.IOException;
-import java.util.Map;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Transactional executors bolt handles the checkpoint and restore of state and timer.
- */
-public class TxExecutorsBolt implements ITransactionStatefulBoltExecutor {
- private static final Logger LOG = LoggerFactory.getLogger(TxExecutorsBolt.class);
-
- private static final String TIME_SERVICE_STORE_ID = "timer_service_store";
- private static final String TIMER_SERVICE_KET = "timer_service_key";
-
- private ExecutorsBolt executorsBolt;
- private IKvStoreManager kvStoreManager;
- private IKvStore<String, TimerService> timerServiceStore;
-
- public TxExecutorsBolt(ExecutorsBolt executorsBolt) {
- this.executorsBolt = executorsBolt;
- this.executorsBolt.setStatefulBolt(true);
- }
-
- @Override
- public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
- try {
- executorsBolt.prepare(stormConf, context, collector);
- kvStoreManager = executorsBolt.getExecutorContext().getKvStoreManager();
- timerServiceStore = kvStoreManager.getOrCreate(TIME_SERVICE_STORE_ID);
- } catch (IOException e) {
- LOG.error("Failed to prepare stateful bolt", e);
- throw new RuntimeException(e.getMessage());
- }
- }
-
- @Override
- public void execute(Tuple input) {
- executorsBolt.execute(input);
- }
-
- @Override
- public void cleanup() {
- executorsBolt.cleanup();
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- executorsBolt.declareOutputFields(declarer);
- }
-
- @Override
- public Map<String, Object> getComponentConfiguration() {
- return executorsBolt.getComponentConfiguration();
- }
-
- @Override
- public void initState(Object userState) {
- LOG.info("Begin to init from state: {}", userState);
- restore(userState);
- }
-
- @Override
- public Object finishBatch(long batchId) {
- try {
- timerServiceStore.put(TIMER_SERVICE_KET, executorsBolt.timerService());
- } catch (IOException e) {
- LOG.error("Failed to store current timer service status", e);
- throw new RuntimeException(e.getMessage());
- }
- kvStoreManager.checkpoint(batchId);
- return null;
- }
-
- @Override
- public Object commit(long batchId, Object state) {
- return kvStoreManager.backup(batchId);
- }
-
- @Override
- public void rollBack(Object userState) {
- LOG.info("Begin to rollback from state: {}", userState);
- restore(userState);
- }
-
- @Override
- public void ackCommit(long batchId, long timeStamp) {
- kvStoreManager.remove(batchId);
- }
-
- private void restore(Object userState) {
- try {
- // restore all states
- kvStoreManager.restore(userState);
-
- // init timer service
- timerServiceStore = kvStoreManager.getOrCreate(TIME_SERVICE_STORE_ID);
- TimerService timerService = timerServiceStore.get(TIMER_SERVICE_KET);
- if (timerService == null) {
- timerService = executorsBolt.initTimerService();
- }
- executorsBolt.setTimerService(timerService);
- } catch (IOException e) {
- LOG.error("Failed to restore state", e);
- throw new RuntimeException(e.getMessage());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxUnboundedSourceSpout.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxUnboundedSourceSpout.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxUnboundedSourceSpout.java
deleted file mode 100644
index 48b410f..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxUnboundedSourceSpout.java
+++ /dev/null
@@ -1,156 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation.runtime;
-
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import com.alibaba.jstorm.cache.IKvStore;
-import com.alibaba.jstorm.cache.IKvStoreManager;
-import com.alibaba.jstorm.cache.KvStoreManagerFactory;
-import com.alibaba.jstorm.transactional.spout.ITransactionSpoutExecutor;
-import java.io.IOException;
-import java.util.Map;
-import org.apache.beam.sdk.io.UnboundedSource;
-import org.slf4j.LoggerFactory;
-
-/**
- * Transactional unbounded source spout handles the checkpoint and restore of state and timer.
- */
-public class TxUnboundedSourceSpout implements ITransactionSpoutExecutor {
- private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(TxUnboundedSourceSpout.class);
-
- private static final String SOURCE_STORE_ID = "SourceCheckpoint";
- private static final String CHECKPOINT_MARK = "CheckpointMark";
-
- private UnboundedSourceSpout sourceSpout;
- private UnboundedSource.UnboundedReader reader;
- private IKvStoreManager kvStoreManager;
- private IKvStore<String, UnboundedSource.CheckpointMark> sourceCheckpointStore;
-
- public TxUnboundedSourceSpout(UnboundedSourceSpout sourceSpout) {
- this.sourceSpout = sourceSpout;
- }
-
- private void restore(Object userState) {
- try {
- kvStoreManager.restore(userState);
- sourceCheckpointStore = kvStoreManager.getOrCreate(SOURCE_STORE_ID);
- UnboundedSource.CheckpointMark checkpointMark = sourceCheckpointStore.get(CHECKPOINT_MARK);
- sourceSpout.createSourceReader(checkpointMark);
- reader = sourceSpout.getUnboundedSourceReader();
- } catch (IOException e) {
- LOG.error("Failed to init state", e);
- throw new RuntimeException(e.getMessage());
- }
- }
-
- @Override
- public void initState(Object userState) {
- restore(userState);
- }
-
- @Override
- public Object finishBatch(long checkpointId) {
- try {
- // Store check point mark from unbounded source reader
- UnboundedSource.CheckpointMark checkpointMark = reader.getCheckpointMark();
- sourceCheckpointStore.put(CHECKPOINT_MARK, checkpointMark);
-
- // checkpoint all kv stores in current manager
- kvStoreManager.checkpoint(checkpointId);
- } catch (IOException e) {
- LOG.error(String.format("Failed to finish batch-%s", checkpointId), e);
- throw new RuntimeException(e.getMessage());
- }
- return null;
- }
-
- @Override
- public Object commit(long batchId, Object state) {
- // backup kv stores to remote state backend
- return kvStoreManager.backup(batchId);
- }
-
- @Override
- public void rollBack(Object userState) {
- restore(userState);
- }
-
- @Override
- public void ackCommit(long batchId, long timeStamp) {
- // remove obsolete state in bolt local and remote state backend
- kvStoreManager.remove(batchId);
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- sourceSpout.declareOutputFields(declarer);
- }
-
- @Override
- public Map<String, Object> getComponentConfiguration() {
- return sourceSpout.getComponentConfiguration();
- }
-
- @Override
- public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
- try {
- sourceSpout.open(conf, context, collector);
- String storeName = String.format("task-%s", context.getThisTaskId());
- String storePath = String.format("%s/beam/%s", context.getWorkerIdDir(), storeName);
- kvStoreManager = KvStoreManagerFactory.getKvStoreManagerWithMonitor(
- context, storeName, storePath, true);
-
- reader = sourceSpout.getUnboundedSourceReader();
- } catch (IOException e) {
- LOG.error("Failed to open transactional unbounded source spout", e);
- throw new RuntimeException(e.getMessage());
- }
- }
-
- @Override
- public void close() {
- sourceSpout.close();
- }
-
- @Override
- public void activate() {
- sourceSpout.activate();
- }
-
- @Override
- public void deactivate() {
- sourceSpout.deactivate();
- }
-
- @Override
- public void nextTuple() {
- sourceSpout.nextTuple();
- }
-
- @Override
- public void ack(Object msgId) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void fail(Object msgId) {
- throw new UnsupportedOperationException();
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/UnboundedSourceSpout.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/UnboundedSourceSpout.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/UnboundedSourceSpout.java
deleted file mode 100644
index 690824d..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/UnboundedSourceSpout.java
+++ /dev/null
@@ -1,191 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation.runtime;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichSpout;
-import backtype.storm.tuple.Values;
-import com.alibaba.jstorm.utils.KryoSerializer;
-import java.io.IOException;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.beam.runners.jstorm.JStormPipelineOptions;
-import org.apache.beam.runners.jstorm.translation.util.CommonInstance;
-import org.apache.beam.runners.jstorm.util.SerializedPipelineOptions;
-import org.apache.beam.sdk.io.UnboundedSource;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.TupleTag;
-import org.joda.time.Instant;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Spout implementation that wraps a Beam UnboundedSource.
- * TODO: add wrapper to support metrics in UnboundedSource.
- */
-public class UnboundedSourceSpout extends AbstractComponent implements IRichSpout {
- private static final Logger LOG = LoggerFactory.getLogger(UnboundedSourceSpout.class);
-
- private final String description;
- private final UnboundedSource source;
- private final SerializedPipelineOptions serializedOptions;
- private final TupleTag<?> outputTag;
-
- private transient JStormPipelineOptions pipelineOptions;
- private transient UnboundedSource.UnboundedReader reader;
- private transient SpoutOutputCollector collector;
-
- private volatile boolean hasNextRecord;
- private AtomicBoolean activated = new AtomicBoolean();
-
- private KryoSerializer<WindowedValue> serializer;
-
- private long lastWaterMark = 0L;
-
- public UnboundedSourceSpout(
- String description,
- UnboundedSource source,
- JStormPipelineOptions options,
- TupleTag<?> outputTag) {
- this.description = checkNotNull(description, "description");
- this.source = checkNotNull(source, "source");
- this.serializedOptions = new SerializedPipelineOptions(checkNotNull(options, "options"));
- this.outputTag = checkNotNull(outputTag, "outputTag");
- }
-
- @Override
- public synchronized void close() {
- try {
- activated.set(false);
- this.reader.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
-
- @Override
- public void activate() {
- activated.set(true);
-
- }
-
- @Override
- public void deactivate() {
- activated.set(false);
- }
-
- @Override
- public void ack(Object msgId) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void fail(Object msgId) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Map<String, Object> getComponentConfiguration() {
- return null;
- }
-
- @Override
- public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
- try {
- this.collector = collector;
- this.pipelineOptions =
- this.serializedOptions.getPipelineOptions().as(JStormPipelineOptions.class);
-
- createSourceReader(null);
-
- this.serializer = new KryoSerializer<>(conf);
- } catch (IOException e) {
- throw new RuntimeException("Unable to create unbounded reader.", e);
- }
- }
-
- public void createSourceReader(UnboundedSource.CheckpointMark checkpointMark) throws IOException {
- if (reader != null) {
- reader.close();
- }
- reader = this.source.createReader(this.pipelineOptions, checkpointMark);
- hasNextRecord = this.reader.start();
- }
-
- @Override
- public synchronized void nextTuple() {
- if (!activated.get()) {
- return;
- }
- try {
- if (!hasNextRecord) {
- hasNextRecord = reader.advance();
- }
-
- while (hasNextRecord && activated.get()) {
- Object value = reader.getCurrent();
- Instant timestamp = reader.getCurrentTimestamp();
-
- WindowedValue wv =
- WindowedValue.of(value, timestamp, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING);
- LOG.debug("Source output: " + wv.getValue());
- if (keyedEmit(outputTag.getId())) {
- KV kv = (KV) wv.getValue();
- // Convert WindowedValue<KV> to <K, WindowedValue<V>>
- byte[] immutableValue = serializer.serialize(wv.withValue(kv.getValue()));
- collector.emit(outputTag.getId(), new Values(kv.getKey(), immutableValue));
- } else {
- byte[] immutableValue = serializer.serialize(wv);
- collector.emit(outputTag.getId(), new Values(immutableValue));
- }
-
- // move to next record
- hasNextRecord = reader.advance();
- }
-
- Instant waterMark = reader.getWatermark();
- if (waterMark != null && lastWaterMark < waterMark.getMillis()) {
- lastWaterMark = waterMark.getMillis();
- collector.flush();
- collector.emit(CommonInstance.BEAM_WATERMARK_STREAM_ID, new Values(waterMark.getMillis()));
- LOG.debug("Source output: WM-{}", waterMark.toDateTime());
- }
- } catch (IOException e) {
- throw new RuntimeException("Exception reading values from source.", e);
- }
- }
-
- public UnboundedSource getUnboundedSource() {
- return source;
- }
-
- public UnboundedSource.UnboundedReader getUnboundedSourceReader() {
- return reader;
- }
-
- @Override
- public String toString() {
- return description;
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ViewExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ViewExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ViewExecutor.java
deleted file mode 100644
index 4320967..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ViewExecutor.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation.runtime;
-
-import org.apache.beam.sdk.transforms.View;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.TupleTag;
-
-/**
- * JStorm {@link Executor} for {@link View}.
- */
-public class ViewExecutor implements Executor {
-
- private final String description;
- private final TupleTag outputTag;
- private ExecutorsBolt executorsBolt;
-
- public ViewExecutor(String description, TupleTag outputTag) {
- this.description = description;
- this.outputTag = outputTag;
- }
-
- @Override
- public void init(ExecutorContext context) {
- this.executorsBolt = context.getExecutorsBolt();
- }
-
- @Override
- public <T> void process(TupleTag<T> tag, WindowedValue<T> elem) {
- executorsBolt.processExecutorElem(outputTag, elem);
- }
-
- @Override
- public void cleanup() {
- }
-
- @Override
- public String toString() {
- return description;
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/WindowAssignExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/WindowAssignExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/WindowAssignExecutor.java
deleted file mode 100644
index 3cd0aa9..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/WindowAssignExecutor.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation.runtime;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.google.common.collect.Iterables;
-import java.util.Collection;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.TupleTag;
-import org.joda.time.Instant;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * JStorm {@link Executor} for {@link org.apache.beam.sdk.transforms.windowing.Window.Assign}.
- * @param <T>
- * @param <W>
- */
-public class WindowAssignExecutor<T, W extends BoundedWindow> implements Executor {
- private static final Logger LOG = LoggerFactory.getLogger(WindowAssignExecutor.class);
-
- private final String description;
- private WindowFn<T, W> windowFn;
- private ExecutorsBolt executorsBolt;
- private TupleTag outputTag;
-
- class JStormAssignContext<InputT, W extends BoundedWindow>
- extends WindowFn<InputT, W>.AssignContext {
- private final WindowedValue<InputT> value;
-
- JStormAssignContext(WindowFn<InputT, W> fn, WindowedValue<InputT> value) {
- fn.super();
- checkArgument(
- Iterables.size(value.getWindows()) == 1,
- String.format(
- "%s passed to window assignment must be in a single window, but it was in %s: %s",
- WindowedValue.class.getSimpleName(),
- Iterables.size(value.getWindows()),
- value.getWindows()));
- this.value = value;
- }
-
- @Override
- public InputT element() {
- return value.getValue();
- }
-
- @Override
- public Instant timestamp() {
- return value.getTimestamp();
- }
-
- @Override
- public BoundedWindow window() {
- return Iterables.getOnlyElement(value.getWindows());
- }
- }
-
- public WindowAssignExecutor(String description, WindowFn<T, W> windowFn, TupleTag outputTag) {
- this.description = description;
- this.windowFn = windowFn;
- this.outputTag = outputTag;
- }
-
- @Override
- public void init(ExecutorContext context) {
- this.executorsBolt = context.getExecutorsBolt();
- }
-
- @Override
- public void process(TupleTag tag, WindowedValue elem) {
- Collection<W> windows = null;
- try {
- windows = windowFn.assignWindows(new JStormAssignContext<>(windowFn, elem));
- for (W window : windows) {
- executorsBolt.processExecutorElem(
- outputTag,
- WindowedValue.of(elem.getValue(), elem.getTimestamp(), window, elem.getPane()));
- }
- } catch (Exception e) {
- LOG.warn("Failed to assign windows for elem=" + elem, e);
- }
- }
-
- @Override
- public void cleanup() {
- }
-
-
- @Override
- public String toString() {
- return description;
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormBagState.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormBagState.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormBagState.java
deleted file mode 100644
index df54383..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormBagState.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation.runtime.state;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.alibaba.jstorm.cache.ComposedKey;
-import com.alibaba.jstorm.cache.IKvStore;
-import com.alibaba.jstorm.cache.KvStoreIterable;
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-import javax.annotation.Nullable;
-import org.apache.beam.runners.core.StateNamespace;
-import org.apache.beam.sdk.state.BagState;
-import org.apache.beam.sdk.state.ReadableState;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Implementation of {@link BagState} in JStorm runner.
- */
-class JStormBagState<K, T> implements BagState<T> {
- private static final Logger LOG = LoggerFactory.getLogger(JStormBagState.class);
-
- @Nullable
- private final K key;
- private final StateNamespace namespace;
- private final IKvStore<ComposedKey, T> kvState;
- private final IKvStore<ComposedKey, Object> stateInfoKvState;
- private int elemIndex;
-
- public JStormBagState(@Nullable K key, StateNamespace namespace, IKvStore<ComposedKey, T> kvState,
- IKvStore<ComposedKey, Object> stateInfoKvState) throws IOException {
- this.key = key;
- this.namespace = checkNotNull(namespace, "namespace");
- this.kvState = checkNotNull(kvState, "kvState");
- this.stateInfoKvState = checkNotNull(stateInfoKvState, "stateInfoKvState");
-
- Integer index = (Integer) stateInfoKvState.get(getComposedKey());
- this.elemIndex = index != null ? ++index : 0;
- }
-
- @Override
- public void add(T input) {
- try {
- kvState.put(getComposedKey(elemIndex), input);
- stateInfoKvState.put(getComposedKey(), elemIndex);
- elemIndex++;
- } catch (IOException e) {
- throw new RuntimeException(e.getCause());
- }
- }
-
- @Override
- public ReadableState<Boolean> isEmpty() {
- return new ReadableState<Boolean>() {
- @Override
- public Boolean read() {
- return elemIndex <= 0;
- }
-
- @Override
- public ReadableState<Boolean> readLater() {
- // TODO: support prefetch.
- return this;
- }
- };
- }
-
- @Override
- public Iterable<T> read() {
- return new BagStateIterable(elemIndex);
- }
-
- @Override
- public BagState readLater() {
- // TODO: support prefetch.
- return this;
- }
-
- @Override
- public void clear() {
- try {
- for (int i = 0; i < elemIndex; i++) {
- kvState.remove(getComposedKey(i));
- }
- stateInfoKvState.remove(getComposedKey());
- elemIndex = 0;
- } catch (IOException e) {
- throw new RuntimeException(e.getCause());
- }
- }
-
- private ComposedKey getComposedKey() {
- return ComposedKey.of(key, namespace);
- }
-
- private ComposedKey getComposedKey(int elemIndex) {
- return ComposedKey.of(key, namespace, elemIndex);
- }
-
- /**
- * Implementation of Bag state Iterable.
- */
- private class BagStateIterable implements KvStoreIterable<T> {
-
- private class BagStateIterator implements Iterator<T> {
- private final int size;
- private int cursor = 0;
-
- BagStateIterator() {
- Integer s = null;
- try {
- s = (Integer) stateInfoKvState.get(getComposedKey());
- } catch (IOException e) {
- LOG.error("Failed to get elemIndex for key={}", getComposedKey());
- }
- this.size = s != null ? ++s : 0;
- }
-
- @Override
- public boolean hasNext() {
- return cursor < size;
- }
-
- @Override
- public T next() {
- if (cursor >= size) {
- throw new NoSuchElementException();
- }
-
- T value = null;
- try {
- value = kvState.get(getComposedKey(cursor));
- } catch (IOException e) {
- LOG.error("Failed to read composed key-[{}]", getComposedKey(cursor));
- }
- cursor++;
- return value;
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
- }
-
- private final int size;
-
- BagStateIterable(int size) {
- this.size = size;
- }
-
- @Override
- public Iterator<T> iterator() {
- return new BagStateIterator();
- }
-
- @Override
- public String toString() {
- return String.format("BagStateIterable: composedKey=%s", getComposedKey());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormCombiningState.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormCombiningState.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormCombiningState.java
deleted file mode 100644
index 7c6a239..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormCombiningState.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation.runtime.state;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.state.BagState;
-import org.apache.beam.sdk.state.CombiningState;
-import org.apache.beam.sdk.state.ReadableState;
-import org.apache.beam.sdk.transforms.Combine;
-
-/**
- * JStorm implementation of {@link CombiningState}.
- */
-public class JStormCombiningState<InputT, AccumT, OutputT>
- implements CombiningState<InputT, AccumT, OutputT> {
-
- @Nullable
- private final BagState<AccumT> accumBagState;
- private final Combine.CombineFn<InputT, AccumT, OutputT> combineFn;
-
- JStormCombiningState(
- BagState<AccumT> accumBagState,
- Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
- this.accumBagState = checkNotNull(accumBagState, "accumBagState");
- this.combineFn = checkNotNull(combineFn, "combineFn");
- }
-
- @Override
- public AccumT getAccum() {
- // TODO: replacing the accumBagState with the merged accum.
- return combineFn.mergeAccumulators(accumBagState.read());
- }
-
- @Override
- public void addAccum(AccumT accumT) {
- accumBagState.add(accumT);
- }
-
- @Override
- public AccumT mergeAccumulators(Iterable<AccumT> iterable) {
- return combineFn.mergeAccumulators(iterable);
- }
-
- @Override
- public void add(InputT input) {
- accumBagState.add(
- combineFn.addInput(combineFn.createAccumulator(), input));
- }
-
- @Override
- public ReadableState<Boolean> isEmpty() {
- return accumBagState.isEmpty();
- }
-
- @Override
- public OutputT read() {
- return combineFn.extractOutput(
- combineFn.mergeAccumulators(accumBagState.read()));
- }
-
- @Override
- public CombiningState<InputT, AccumT, OutputT> readLater() {
- // TODO: support prefetch.
- return this;
- }
-
- @Override
- public void clear() {
- accumBagState.clear();
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormMapState.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormMapState.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormMapState.java
deleted file mode 100644
index ac3f91f..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormMapState.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation.runtime.state;
-
-import com.alibaba.jstorm.cache.IKvStore;
-import java.io.IOException;
-import java.util.Map;
-import org.apache.beam.runners.core.StateNamespace;
-import org.apache.beam.sdk.state.MapState;
-import org.apache.beam.sdk.state.ReadableState;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Implementation of {@link MapState} in JStorm runner.
- * @param <K>
- * @param <V>
- */
-public class JStormMapState<K, V> implements MapState<K, V> {
- private static final Logger LOG = LoggerFactory.getLogger(JStormMapState.class);
-
- private final K key;
- private final StateNamespace namespace;
- private IKvStore<K, V> kvStore;
-
- public JStormMapState(K key, StateNamespace namespace, IKvStore<K, V> kvStore) {
- this.key = key;
- this.namespace = namespace;
- this.kvStore = kvStore;
- }
-
- @Override
- public void put(K var1, V var2) {
- try {
- kvStore.put(var1, var2);
- } catch (IOException e) {
- reportError(String.format("Failed to put key=%s, value=%s", var1, var2), e);
- }
- }
-
- @Override
- public ReadableState<V> putIfAbsent(K var1, V var2) {
- ReadableState<V> ret = null;
- try {
- V value = kvStore.get(var1);
- if (value == null) {
- kvStore.put(var1, var2);
- ret = new MapReadableState<>(null);
- } else {
- ret = new MapReadableState<>(value);
- }
- } catch (IOException e) {
- reportError(String.format("Failed to putIfAbsent key=%s, value=%s", var1, var2), e);
- }
- return ret;
- }
-
- @Override
- public void remove(K var1) {
- try {
- kvStore.remove(var1);
- } catch (IOException e) {
- reportError(String.format("Failed to remove key=%s", var1), e);
- }
- }
-
- @Override
- public ReadableState<V> get(K var1) {
- ReadableState<V> ret = new MapReadableState<>(null);
- try {
- ret = new MapReadableState(kvStore.get(var1));
- } catch (IOException e) {
- reportError(String.format("Failed to get value for key=%s", var1), e);
- }
- return ret;
- }
-
- @Override
- public ReadableState<Iterable<K>> keys() {
- ReadableState<Iterable<K>> ret = new MapReadableState<>(null);
- try {
- ret = new MapReadableState<>(kvStore.keys());
- } catch (IOException e) {
- reportError(String.format("Failed to get keys"), e);
- }
- return ret;
- }
-
- @Override
- public ReadableState<Iterable<V>> values() {
- ReadableState<Iterable<V>> ret = new MapReadableState<>(null);
- try {
- ret = new MapReadableState<>(kvStore.values());
- } catch (IOException e) {
- reportError(String.format("Failed to get values"), e);
- }
- return ret;
- }
-
- @Override
- public ReadableState<Iterable<Map.Entry<K, V>>> entries() {
- ReadableState<Iterable<Map.Entry<K, V>>> ret = new MapReadableState<>(null);
- try {
- ret = new MapReadableState<>(kvStore.entries());
- } catch (IOException e) {
- reportError(String.format("Failed to get values"), e);
- }
- return ret;
- }
-
- @Override
- public void clear() {
- try {
- Iterable<K> keys = kvStore.keys();
- kvStore.removeBatch(keys);
- } catch (IOException e) {
- reportError(String.format("Failed to clear map state"), e);
- }
- }
-
- private void reportError(String errorInfo, IOException e) {
- LOG.error(errorInfo, e);
- throw new RuntimeException(errorInfo);
- }
-
- private class MapReadableState<T> implements ReadableState<T> {
- private T value;
-
- public MapReadableState(T value) {
- this.value = value;
- }
-
- @Override
- public T read() {
- return value;
- }
-
- @Override
- public ReadableState<T> readLater() {
- return this;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormStateInternals.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormStateInternals.java
deleted file mode 100644
index 80ef3a2..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormStateInternals.java
+++ /dev/null
@@ -1,191 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation.runtime.state;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.alibaba.jstorm.cache.ComposedKey;
-import com.alibaba.jstorm.cache.IKvStoreManager;
-import java.io.IOException;
-import javax.annotation.Nullable;
-import org.apache.beam.runners.core.StateInternals;
-import org.apache.beam.runners.core.StateNamespace;
-import org.apache.beam.runners.core.StateTag;
-import org.apache.beam.runners.jstorm.translation.runtime.TimerService;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.state.BagState;
-import org.apache.beam.sdk.state.CombiningState;
-import org.apache.beam.sdk.state.MapState;
-import org.apache.beam.sdk.state.SetState;
-import org.apache.beam.sdk.state.State;
-import org.apache.beam.sdk.state.StateBinder;
-import org.apache.beam.sdk.state.StateContext;
-import org.apache.beam.sdk.state.StateSpec;
-import org.apache.beam.sdk.state.ValueState;
-import org.apache.beam.sdk.state.WatermarkHoldState;
-import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.Combine.BinaryCombineFn;
-import org.apache.beam.sdk.transforms.CombineWithContext;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
-import org.joda.time.Instant;
-
-/**
- * JStorm implementation of {@link StateInternals}.
- */
-public class JStormStateInternals<K> implements StateInternals {
-
- private static final String STATE_INFO = "state-info:";
-
- @Nullable
- private final K key;
- private final IKvStoreManager kvStoreManager;
- private final TimerService timerService;
- private final int executorId;
-
- public JStormStateInternals(K key, IKvStoreManager kvStoreManager,
- TimerService timerService, int executorId) {
- this.key = key;
- this.kvStoreManager = checkNotNull(kvStoreManager, "kvStoreManager");
- this.timerService = checkNotNull(timerService, "timerService");
- this.executorId = executorId;
- }
-
- @Nullable
- @Override
- public K getKey() {
- return key;
- }
-
- @Override
- public <T extends State> T state(
- StateNamespace namespace, StateTag<T> address, StateContext<?> c) {
- // throw new UnsupportedOperationException("StateContext is not supported.");
- /**
- * TODOļ¼
- * Same implementation as state() which is without StateContext. This might be updated after
- * we figure out if we really need StateContext for JStorm state internals.
- */
- return state(namespace, address);
- }
-
- @Override
- public <T extends State> T state(final StateNamespace namespace, StateTag<T> address) {
- return address.getSpec().bind(address.getId(), new StateBinder() {
- @Override
- public <T> ValueState<T> bindValue(String id, StateSpec<ValueState<T>> spec, Coder<T> coder) {
- try {
- return new JStormValueState<>(
- getKey(), namespace, kvStoreManager.<ComposedKey, T>getOrCreate(getStoreId(id)));
- } catch (IOException e) {
- throw new RuntimeException();
- }
- }
-
- @Override
- public <T> BagState<T> bindBag(String id, StateSpec<BagState<T>> spec, Coder<T> elemCoder) {
- try {
- return new JStormBagState(
- getKey(), namespace, kvStoreManager.<ComposedKey, T>getOrCreate(getStoreId(id)),
- kvStoreManager.<ComposedKey, Object>getOrCreate(STATE_INFO + getStoreId(id)));
- } catch (IOException e) {
- throw new RuntimeException();
- }
- }
-
- @Override
- public <T> SetState<T> bindSet(String id, StateSpec<SetState<T>> spec, Coder<T> elemCoder) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(
- String id,
- StateSpec<MapState<KeyT, ValueT>> spec,
- Coder<KeyT> mapKeyCoder,
- Coder<ValueT> mapValueCoder) {
- try {
- return new JStormMapState<>(
- getKey(), namespace, kvStoreManager.<KeyT, ValueT>getOrCreate(getStoreId(id)));
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public <InputT, AccumT, OutputT> CombiningState bindCombining(
- String id,
- StateSpec<CombiningState<InputT, AccumT, OutputT>> spec,
- Coder<AccumT> accumCoder,
- Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
- try {
- BagState<AccumT> accumBagState = new JStormBagState(
- getKey(), namespace,
- kvStoreManager.<ComposedKey, AccumT>getOrCreate(getStoreId(id)),
- kvStoreManager.<ComposedKey, Object>getOrCreate(STATE_INFO + getStoreId(id)));
- return new JStormCombiningState<>(accumBagState, combineFn);
- } catch (IOException e) {
- throw new RuntimeException();
- }
- }
-
-
- @Override
- public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
- bindCombiningWithContext(
- String id,
- StateSpec<CombiningState<InputT, AccumT, OutputT>> stateSpec, Coder<AccumT> coder,
- CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFnWithContext) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public WatermarkHoldState bindWatermark(
- String id,
- StateSpec<WatermarkHoldState> spec,
- final TimestampCombiner timestampCombiner) {
- try {
- BagState<Combine.Holder<Instant>> accumBagState = new JStormBagState(
- getKey(), namespace,
- kvStoreManager.<ComposedKey, Combine.Holder<Instant>>getOrCreate(getStoreId(id)),
- kvStoreManager.<ComposedKey, Object>getOrCreate(STATE_INFO + getStoreId(id)));
-
- Combine.CombineFn<Instant, Combine.Holder<Instant>, Instant> outputTimeCombineFn =
- new BinaryCombineFn<Instant>() {
- @Override
- public Instant apply(Instant left, Instant right) {
- return timestampCombiner.combine(left, right);
- }
- };
- return new JStormWatermarkHoldState(
- namespace,
- new JStormCombiningState<>(
- accumBagState,
- outputTimeCombineFn),
- timestampCombiner,
- timerService);
- } catch (IOException e) {
- throw new RuntimeException();
- }
- }
- });
- }
-
- private String getStoreId(String stateId) {
- return String.format("%s-%s", stateId, executorId);
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormValueState.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormValueState.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormValueState.java
deleted file mode 100644
index 79ff6b4..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormValueState.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation.runtime.state;
-
-import com.alibaba.jstorm.cache.ComposedKey;
-import com.alibaba.jstorm.cache.IKvStore;
-import java.io.IOException;
-import javax.annotation.Nullable;
-import org.apache.beam.runners.core.StateNamespace;
-import org.apache.beam.sdk.state.ValueState;
-
-/**
- * JStorm implementation of {@link ValueState}.
- */
-public class JStormValueState<K, T> implements ValueState<T> {
-
- @Nullable
- private final K key;
- private final StateNamespace namespace;
- private final IKvStore<ComposedKey, T> kvState;
-
- JStormValueState(@Nullable K key, StateNamespace namespace, IKvStore<ComposedKey, T> kvState) {
- this.key = key;
- this.namespace = namespace;
- this.kvState = kvState;
- }
-
- @Override
- public void write(T t) {
- try {
- kvState.put(getComposedKey(), t);
- } catch (IOException e) {
- throw new RuntimeException(String.format(
- "Failed to write key: %s, namespace: %s, value: %s.", key, namespace, t));
- }
- }
-
- @Override
- public T read() {
- try {
- return kvState.get(getComposedKey());
- } catch (IOException e) {
- throw new RuntimeException(String.format(
- "Failed to read key: %s, namespace: %s.", key, namespace));
- }
- }
-
- @Override
- public ValueState<T> readLater() {
- // TODO: support prefetch.
- return this;
- }
-
- @Override
- public void clear() {
- try {
- kvState.remove(getComposedKey());
- } catch (IOException e) {
- throw new RuntimeException(String.format(
- "Failed to clear key: %s, namespace: %s.", key, namespace));
- }
- }
-
- private ComposedKey getComposedKey() {
- return ComposedKey.of(key, namespace);
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormWatermarkHoldState.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormWatermarkHoldState.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormWatermarkHoldState.java
deleted file mode 100644
index dc3ba43..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormWatermarkHoldState.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation.runtime.state;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import org.apache.beam.runners.core.StateNamespace;
-import org.apache.beam.runners.jstorm.translation.runtime.TimerService;
-import org.apache.beam.sdk.state.GroupingState;
-import org.apache.beam.sdk.state.ReadableState;
-import org.apache.beam.sdk.state.WatermarkHoldState;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
-import org.joda.time.Instant;
-
-/**
- * JStorm implementation of {@link WatermarkHoldState}.
- */
-public class JStormWatermarkHoldState implements WatermarkHoldState {
-
- private final StateNamespace namespace;
- private final GroupingState<Instant, Instant> watermarkHoldsState;
- private final TimestampCombiner timestampCombiner;
- private final TimerService timerService;
-
- JStormWatermarkHoldState(
- StateNamespace namespace,
- GroupingState<Instant, Instant> watermarkHoldsState,
- TimestampCombiner timestampCombiner,
- TimerService timerService) {
- this.namespace = checkNotNull(namespace, "namespace");
- this.watermarkHoldsState = checkNotNull(watermarkHoldsState, "watermarkHoldsState");
- this.timestampCombiner = checkNotNull(timestampCombiner, "timestampCombiner");
- this.timerService = checkNotNull(timerService, "timerService");
- }
-
- @Override
- public TimestampCombiner getTimestampCombiner() {
- return timestampCombiner;
- }
-
- @Override
- public void add(Instant instant) {
- timerService.addWatermarkHold(namespace.stringKey(), instant);
- watermarkHoldsState.add(instant);
- }
-
- @Override
- public ReadableState<Boolean> isEmpty() {
- return watermarkHoldsState.isEmpty();
- }
-
- @Override
- public Instant read() {
- return watermarkHoldsState.read();
- }
-
- @Override
- public WatermarkHoldState readLater() {
- // TODO: support prefetch.
- return this;
- }
-
- @Override
- public void clear() {
- timerService.clearWatermarkHold(namespace.stringKey());
- watermarkHoldsState.clear();
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/timer/JStormTimerInternals.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/timer/JStormTimerInternals.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/timer/JStormTimerInternals.java
deleted file mode 100644
index 184a957..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/timer/JStormTimerInternals.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation.runtime.timer;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import javax.annotation.Nullable;
-import org.apache.beam.runners.core.StateNamespace;
-import org.apache.beam.runners.core.TimerInternals;
-import org.apache.beam.runners.jstorm.translation.runtime.DoFnExecutor;
-import org.apache.beam.runners.jstorm.translation.runtime.TimerService;
-import org.apache.beam.sdk.state.TimeDomain;
-import org.joda.time.Instant;
-
-/**
- * JStorm implementation of {@link TimerInternals}.
- */
-public class JStormTimerInternals<K> implements TimerInternals {
-
- private final K key;
- private final DoFnExecutor<?, ?> doFnExecutor;
- private final TimerService timerService;
-
-
- public JStormTimerInternals(
- @Nullable K key, DoFnExecutor<?, ?> doFnExecutor, TimerService timerService) {
- this.key = key;
- this.doFnExecutor = checkNotNull(doFnExecutor, "doFnExecutor");
- this.timerService = checkNotNull(timerService, "timerService");
- }
-
- @Override
- public void setTimer(
- StateNamespace namespace, String timerId, Instant target, TimeDomain timeDomain) {
- setTimer(TimerData.of(timerId, namespace, target, timeDomain));
- }
-
- @Override
- @Deprecated
- public void setTimer(TimerData timerData) {
- timerService.setTimer(key, timerData, doFnExecutor);
- }
-
- @Override
- public void deleteTimer(StateNamespace namespace, String timerId, TimeDomain timeDomain) {
- throw new UnsupportedOperationException(
- "Canceling of a timer is not yet supported.");
- }
-
- @Override
- @Deprecated
- public void deleteTimer(StateNamespace namespace, String timerId) {
- throw new UnsupportedOperationException(
- "Canceling of a timer is not yet supported.");
- }
-
- @Override
- @Deprecated
- public void deleteTimer(TimerData timerData) {
- throw new UnsupportedOperationException(
- "Canceling of a timer is not yet supported.");
- }
-
- @Override
- public Instant currentProcessingTime() {
- return Instant.now();
- }
-
- @Override
- @Nullable
- public Instant currentSynchronizedProcessingTime() {
- return null;
- }
-
- @Override
- public Instant currentInputWatermarkTime() {
- return new Instant(timerService.currentInputWatermark());
- }
-
- @Override
- @Nullable
- public Instant currentOutputWatermarkTime() {
- return new Instant(timerService.currentOutputWatermark());
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/BoundedSourceTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/BoundedSourceTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/BoundedSourceTranslator.java
deleted file mode 100644
index 7e7a54a..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/BoundedSourceTranslator.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation.translator;
-
-import org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource;
-import org.apache.beam.runners.jstorm.translation.TranslationContext;
-import org.apache.beam.runners.jstorm.translation.runtime.UnboundedSourceSpout;
-import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TaggedPValue;
-import org.apache.beam.sdk.values.TupleTag;
-
-/**
- * Translates a {@link Read.Bounded} into a Storm spout.
- *
- * @param <T>
- */
-public class BoundedSourceTranslator<T> extends TransformTranslator.Default<Read.Bounded<T>> {
-
- @Override
- public void translateNode(Read.Bounded<T> transform, TranslationContext context) {
- TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
- String description =
- describeTransform(transform, userGraphContext.getInputs(), userGraphContext.getOutputs());
-
- TupleTag<?> outputTag = userGraphContext.getOutputTag();
- PValue outputValue = userGraphContext.getOutput();
- UnboundedSourceSpout spout = new UnboundedSourceSpout(
- description,
- new UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter(transform.getSource()),
- userGraphContext.getOptions(), outputTag);
-
- context.getExecutionGraphContext().registerSpout(
- spout, TaggedPValue.of(outputTag, outputValue));
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/FlattenTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/FlattenTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/FlattenTranslator.java
deleted file mode 100644
index 44ce8d8..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/FlattenTranslator.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation.translator;
-
-import com.google.common.collect.Maps;
-import java.util.Map;
-import org.apache.beam.runners.jstorm.translation.TranslationContext;
-import org.apache.beam.runners.jstorm.translation.runtime.FlattenExecutor;
-import org.apache.beam.sdk.transforms.Flatten;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TupleTag;
-
-/**
- * Translates a {@link Flatten} to a JStorm {@link FlattenExecutor}.
- * @param <V>
- */
-public class FlattenTranslator<V> extends TransformTranslator.Default<Flatten.PCollections<V>> {
-
- @Override
- public void translateNode(Flatten.PCollections<V> transform, TranslationContext context) {
- TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
-
- // Since a new tag is created in PCollectionList, retrieve the real tag here.
- Map<TupleTag<?>, PValue> inputs = Maps.newHashMap();
- for (Map.Entry<TupleTag<?>, PValue> entry : userGraphContext.getInputs().entrySet()) {
- PCollection<V> pc = (PCollection<V>) entry.getValue();
- inputs.putAll(pc.expand());
- }
- System.out.println("Real inputs: " + inputs);
- System.out.println("FlattenList inputs: " + userGraphContext.getInputs());
- String description = describeTransform(transform, inputs, userGraphContext.getOutputs());
- FlattenExecutor executor = new FlattenExecutor(description, userGraphContext.getOutputTag());
- context.addTransformExecutor(executor, inputs, userGraphContext.getOutputs());
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/GroupByKeyTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/GroupByKeyTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/GroupByKeyTranslator.java
deleted file mode 100644
index 85cb85d..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/GroupByKeyTranslator.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation.translator;
-
-import com.google.common.collect.Lists;
-import java.util.Collections;
-import java.util.List;
-import org.apache.beam.runners.jstorm.translation.TranslationContext;
-import org.apache.beam.runners.jstorm.translation.runtime.GroupByWindowExecutor;
-import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.WindowingStrategy;
-
-/**
- * Translates a {@link GroupByKey} to a JStorm {@link GroupByWindowExecutor}.
- * @param <K>
- * @param <V>
- */
-public class GroupByKeyTranslator<K, V> extends TransformTranslator.Default<GroupByKey<K, V>> {
- // information of transform
- protected PCollection<KV<K, V>> input;
- protected PCollection<KV<K, Iterable<V>>> output;
- protected List<TupleTag<?>> inputTags;
- protected TupleTag<KV<K, Iterable<V>>> mainOutputTag;
- protected List<TupleTag<?>> sideOutputTags;
- protected List<PCollectionView<?>> sideInputs;
- protected WindowingStrategy<?, ?> windowingStrategy;
-
- @Override
- public void translateNode(GroupByKey<K, V> transform, TranslationContext context) {
- TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
- String description =
- describeTransform(transform, userGraphContext.getInputs(), userGraphContext.getOutputs());
-
- input = (PCollection<KV<K, V>>) userGraphContext.getInput();
- output = (PCollection<KV<K, Iterable<V>>>) userGraphContext.getOutput();
-
- inputTags = userGraphContext.getInputTags();
- mainOutputTag = (TupleTag<KV<K, Iterable<V>>>) userGraphContext.getOutputTag();
- sideOutputTags = Lists.newArrayList();
-
- sideInputs = Collections.<PCollectionView<?>>emptyList();
- windowingStrategy = input.getWindowingStrategy();
-
- GroupByWindowExecutor<K, V> groupByWindowExecutor = new GroupByWindowExecutor<>(
- userGraphContext.getStepName(),
- description,
- context,
- context.getUserGraphContext().getOptions(),
- windowingStrategy,
- mainOutputTag,
- sideOutputTags);
- context.addTransformExecutor(groupByWindowExecutor);
- }
-}