You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2016/03/28 08:07:13 UTC
[1/9] storm git commit: STORM-676 Upmerged and resolved conflicts
Repository: storm
Updated Branches:
refs/heads/1.x-branch 1783d7ae9 -> b0db246ad
http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/windowing/TridentBatchTuple.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/TridentBatchTuple.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/TridentBatchTuple.java
new file mode 100644
index 0000000..13643db
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/TridentBatchTuple.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.windowing;
+
+import org.apache.storm.trident.tuple.TridentTuple;
+
+/**
+ *
+ */
+public class TridentBatchTuple {
+ final String effectiveBatchId;
+ final long timeStamp;
+ final int tupleIndex;
+ final TridentTuple tridentTuple;
+
+ public TridentBatchTuple(String effectiveBatchId, long timeStamp, int tupleIndex) {
+ this(effectiveBatchId, timeStamp, tupleIndex, null);
+ }
+
+ public TridentBatchTuple(String effectiveBatchId, long timeStamp, int tupleIndex, TridentTuple tridentTuple) {
+ this.effectiveBatchId = effectiveBatchId;
+ this.timeStamp = timeStamp;
+ this.tupleIndex = tupleIndex;
+ this.tridentTuple = tridentTuple;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowTridentProcessor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowTridentProcessor.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowTridentProcessor.java
new file mode 100644
index 0000000..c2d9362
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowTridentProcessor.java
@@ -0,0 +1,260 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.windowing;
+
+import org.apache.storm.Config;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.trident.operation.Aggregator;
+import org.apache.storm.trident.planner.ProcessorContext;
+import org.apache.storm.trident.planner.TridentProcessor;
+import org.apache.storm.trident.planner.processor.FreshCollector;
+import org.apache.storm.trident.planner.processor.TridentContext;
+import org.apache.storm.trident.spout.IBatchID;
+import org.apache.storm.trident.tuple.ConsList;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.apache.storm.trident.tuple.TridentTupleView;
+import org.apache.storm.trident.windowing.config.WindowConfig;
+import org.apache.storm.tuple.Fields;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+
+/**
+ * {@code TridentProcessor} implementation for windowing operations on trident stream.
+ *
+ */
+public class WindowTridentProcessor implements TridentProcessor {
+ private static final Logger log = LoggerFactory.getLogger(WindowTridentProcessor.class);
+
+ public static final String TRIGGER_INPROCESS_PREFIX = "tip" + WindowsStore.KEY_SEPARATOR;
+ public static final String TRIGGER_PREFIX = "tr" + WindowsStore.KEY_SEPARATOR;
+ public static final String TRIGGER_COUNT_PREFIX = "tc" + WindowsStore.KEY_SEPARATOR;
+
+ public static final String TRIGGER_FIELD_NAME = "_task_info";
+ public static final long DEFAULT_INMEMORY_TUPLE_CACHE_LIMIT = 100l;
+
+ private final String windowId;
+ private final Fields inputFields;
+ private final Aggregator aggregator;
+ private final boolean storeTuplesInStore;
+
+ private String windowTriggerInprocessId;
+ private WindowConfig windowConfig;
+ private WindowsStoreFactory windowStoreFactory;
+ private WindowsStore windowStore;
+
+ private Map conf;
+ private TopologyContext topologyContext;
+ private FreshCollector collector;
+ private TridentTupleView.ProjectionFactory projection;
+ private TridentContext tridentContext;
+ private ITridentWindowManager tridentWindowManager;
+ private String windowTaskId;
+
+ public WindowTridentProcessor(WindowConfig windowConfig, String uniqueWindowId, WindowsStoreFactory windowStoreFactory,
+ Fields inputFields, Aggregator aggregator, boolean storeTuplesInStore) {
+
+ this.windowConfig = windowConfig;
+ this.windowId = uniqueWindowId;
+ this.windowStoreFactory = windowStoreFactory;
+ this.inputFields = inputFields;
+ this.aggregator = aggregator;
+ this.storeTuplesInStore = storeTuplesInStore;
+ }
+
+ @Override
+ public void prepare(Map conf, TopologyContext context, TridentContext tridentContext) {
+ this.conf = conf;
+ this.topologyContext = context;
+ List<TridentTuple.Factory> parents = tridentContext.getParentTupleFactories();
+ if (parents.size() != 1) {
+ throw new RuntimeException("Aggregation related operation can only have one parent");
+ }
+
+ Long maxTuplesCacheSize = getWindowTuplesCacheSize(conf);
+
+ this.tridentContext = tridentContext;
+ collector = new FreshCollector(tridentContext);
+ projection = new TridentTupleView.ProjectionFactory(parents.get(0), inputFields);
+
+ windowStore = windowStoreFactory.create();
+ windowTaskId = windowId + WindowsStore.KEY_SEPARATOR + topologyContext.getThisTaskId() + WindowsStore.KEY_SEPARATOR;
+ windowTriggerInprocessId = getWindowTriggerInprocessIdPrefix(windowTaskId);
+
+ tridentWindowManager = storeTuplesInStore ?
+ new StoreBasedTridentWindowManager(windowConfig, windowTaskId, windowStore, aggregator, tridentContext.getDelegateCollector(), maxTuplesCacheSize, inputFields)
+ : new InMemoryTridentWindowManager(windowConfig, windowTaskId, windowStore, aggregator, tridentContext.getDelegateCollector());
+
+ tridentWindowManager.prepare();
+ }
+
+ public static String getWindowTriggerInprocessIdPrefix(String windowTaskId) {
+ return TRIGGER_INPROCESS_PREFIX + windowTaskId;
+ }
+
+ public static String getWindowTriggerTaskPrefix(String windowTaskId) {
+ return TRIGGER_PREFIX + windowTaskId;
+ }
+
+ private Long getWindowTuplesCacheSize(Map conf) {
+ if (conf.containsKey(Config.TOPOLOGY_TRIDENT_WINDOWING_INMEMORY_CACHE_LIMIT)) {
+ return ((Number) conf.get(Config.TOPOLOGY_TRIDENT_WINDOWING_INMEMORY_CACHE_LIMIT)).longValue();
+ }
+ return DEFAULT_INMEMORY_TUPLE_CACHE_LIMIT;
+ }
+
+ @Override
+ public void cleanup() {
+ log.info("shutting down window manager");
+ tridentWindowManager.shutdown();
+ }
+
+ @Override
+ public void startBatch(ProcessorContext processorContext) {
+ // initialize state for batch
+ processorContext.state[tridentContext.getStateIndex()] = new ArrayList<TridentTuple>();
+ }
+
+ @Override
+ public void execute(ProcessorContext processorContext, String streamId, TridentTuple tuple) {
+ // add tuple to the batch state
+ Object state = processorContext.state[tridentContext.getStateIndex()];
+ ((List<TridentTuple>) state).add(projection.create(tuple));
+ }
+
+ @Override
+ public void finishBatch(ProcessorContext processorContext) {
+
+ Object batchId = processorContext.batchId;
+ Object batchTxnId = getBatchTxnId(batchId);
+
+ log.debug("Received finishBatch of : {} ", batchId);
+ // get all the tuples in a batch and add it to trident-window-manager
+ List<TridentTuple> tuples = (List<TridentTuple>) processorContext.state[tridentContext.getStateIndex()];
+ tridentWindowManager.addTuplesBatch(batchId, tuples);
+
+ List<Integer> pendingTriggerIds = null;
+ List<String> triggerKeys = new ArrayList<>();
+ Iterable<Object> triggerValues = null;
+
+ if (retriedAttempt(batchId)) {
+ pendingTriggerIds = (List<Integer>) windowStore.get(inprocessTriggerKey(batchTxnId));
+ for (Integer pendingTriggerId : pendingTriggerIds) {
+ triggerKeys.add(triggerKey(pendingTriggerId));
+ }
+ triggerValues = windowStore.get(triggerKeys);
+ }
+
+ // if there are no trigger values in earlier attempts or this is a new batch, emit pending triggers.
+ if(triggerValues == null) {
+ pendingTriggerIds = new ArrayList<>();
+ Queue<StoreBasedTridentWindowManager.TriggerResult> pendingTriggers = tridentWindowManager.getPendingTriggers();
+ log.debug("pending triggers at batch: {} and triggers.size: {} ", batchId, pendingTriggers.size());
+ try {
+ Iterator<StoreBasedTridentWindowManager.TriggerResult> pendingTriggersIter = pendingTriggers.iterator();
+ List<Object> values = new ArrayList<>();
+ StoreBasedTridentWindowManager.TriggerResult triggerResult = null;
+ while (pendingTriggersIter.hasNext()) {
+ triggerResult = pendingTriggersIter.next();
+ for (List<Object> aggregatedResult : triggerResult.result) {
+ String triggerKey = triggerKey(triggerResult.id);
+ triggerKeys.add(triggerKey);
+ values.add(aggregatedResult);
+ pendingTriggerIds.add(triggerResult.id);
+ }
+ pendingTriggersIter.remove();
+ }
+ triggerValues = values;
+ } finally {
+ // store inprocess triggers of a batch in store for batch retries for any failures
+ if (!pendingTriggerIds.isEmpty()) {
+ windowStore.put(inprocessTriggerKey(batchTxnId), pendingTriggerIds);
+ }
+ }
+ }
+
+ collector.setContext(processorContext);
+ int i = 0;
+ for (Object resultValue : triggerValues) {
+ collector.emit(new ConsList(new TriggerInfo(windowTaskId, pendingTriggerIds.get(i++)), (List<Object>) resultValue));
+ }
+ collector.setContext(null);
+ }
+
+ private String inprocessTriggerKey(Object batchTxnId) {
+ return windowTriggerInprocessId + batchTxnId;
+ }
+
+ public static Object getBatchTxnId(Object batchId) {
+ if (batchId instanceof IBatchID) {
+ return ((IBatchID) batchId).getId();
+ }
+ return null;
+ }
+
+ static boolean retriedAttempt(Object batchId) {
+ if (batchId instanceof IBatchID) {
+ return ((IBatchID) batchId).getAttemptId() > 0;
+ }
+
+ return false;
+ }
+
+ @Override
+ public TridentTuple.Factory getOutputFactory() {
+ return collector.getOutputFactory();
+ }
+
+ public static class TriggerInfo implements Serializable {
+ public final String windowTaskId;
+ public final int triggerId;
+
+ public TriggerInfo(String windowTaskId, int triggerId) {
+ this.windowTaskId = windowTaskId;
+ this.triggerId = triggerId;
+ }
+
+ public String generateTriggerKey() {
+ return generateWindowTriggerKey(windowTaskId, triggerId);
+ }
+
+ @Override
+ public String toString() {
+ return "TriggerInfo{" +
+ "windowTaskId='" + windowTaskId + '\'' +
+ ", triggerId=" + triggerId +
+ '}';
+ }
+ }
+
+ public String triggerKey(int triggerId) {
+ return generateWindowTriggerKey(windowTaskId, triggerId);
+ }
+
+ public static String generateWindowTriggerKey(String windowTaskId, int triggerId) {
+ return TRIGGER_PREFIX + windowTaskId + triggerId;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsState.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsState.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsState.java
new file mode 100644
index 0000000..378d24f
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsState.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.windowing;
+
+import org.apache.storm.trident.state.State;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@code State} implementation for windowing operation. This is mainly used to get callback of commit txId of batches
+ * in which triggers are emitted.
+ *
+ */
+public class WindowsState implements State {
+ private static final Logger log = LoggerFactory.getLogger(WindowsState.class);
+
+ private Long currentTxId;
+
+ public WindowsState() {
+ }
+
+ @Override
+ public void beginCommit(Long txId) {
+ currentTxId = txId;
+ log.debug(" WindowsState.beginCommit:: [{}] ", txId);
+ }
+
+ @Override
+ public void commit(Long txId) {
+ log.debug("WindowsState.commit :: [{}]", txId);
+ }
+
+ public Long getCurrentTxId() {
+ return currentTxId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStateFactory.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStateFactory.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStateFactory.java
new file mode 100644
index 0000000..28893c7
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStateFactory.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.windowing;
+
+import org.apache.storm.task.IMetricsContext;
+import org.apache.storm.trident.state.State;
+import org.apache.storm.trident.state.StateFactory;
+
+import java.util.Map;
+
+/**
+ * {@code StateFactory} instance for creating {@code WindowsState} instances.
+ *
+ */
+public class WindowsStateFactory implements StateFactory {
+
+ public WindowsStateFactory() {
+ }
+
+ @Override
+ public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
+ return new WindowsState();
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStateUpdater.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStateUpdater.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStateUpdater.java
new file mode 100644
index 0000000..45ac885
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStateUpdater.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.windowing;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.lang.IllegalClassException;
+import org.apache.storm.topology.FailedException;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.operation.TridentOperationContext;
+import org.apache.storm.trident.state.StateUpdater;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * {@code StateUpdater<WindowState>} instance which removes successfully emitted triggers from store
+ */
+public class WindowsStateUpdater implements StateUpdater<WindowsState> {
+
+ private static final Logger log = LoggerFactory.getLogger(WindowsStateUpdater.class);
+
+ private final WindowsStoreFactory windowStoreFactory;
+ private WindowsStore windowsStore;
+
+ public WindowsStateUpdater(WindowsStoreFactory windowStoreFactory) {
+ this.windowStoreFactory = windowStoreFactory;
+ }
+
+ @Override
+ public void updateState(WindowsState state, List<TridentTuple> tuples, TridentCollector collector) {
+ Long currentTxId = state.getCurrentTxId();
+ log.debug("Removing triggers using WindowStateUpdater, txnId: {} ", currentTxId);
+ for (TridentTuple tuple : tuples) {
+ try {
+ Object fieldValue = tuple.getValueByField(WindowTridentProcessor.TRIGGER_FIELD_NAME);
+ if(! (fieldValue instanceof WindowTridentProcessor.TriggerInfo)) {
+ throw new IllegalClassException(WindowTridentProcessor.TriggerInfo.class, fieldValue.getClass());
+ }
+ WindowTridentProcessor.TriggerInfo triggerInfo = (WindowTridentProcessor.TriggerInfo) fieldValue;
+ String triggerCompletedKey = WindowTridentProcessor.getWindowTriggerInprocessIdPrefix(triggerInfo.windowTaskId)+currentTxId;
+
+ log.debug("Removing trigger key [{}] and trigger completed key [{}] from store: [{}]", triggerInfo, triggerCompletedKey, windowsStore);
+
+ windowsStore.removeAll(Lists.newArrayList(triggerInfo.generateTriggerKey(), triggerCompletedKey));
+ } catch (Exception ex) {
+ log.warn(ex.getMessage());
+ collector.reportError(ex);
+ throw new FailedException(ex);
+ }
+ }
+ }
+
+ @Override
+ public void prepare(Map conf, TridentOperationContext context) {
+ windowsStore = windowStoreFactory.create();
+ }
+
+ @Override
+ public void cleanup() {
+ windowsStore.shutdown();
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStore.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStore.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStore.java
new file mode 100644
index 0000000..8904b7b
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStore.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.windowing;
+
+import com.google.common.base.Preconditions;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Store for storing window related entities like windowed tuples, triggers etc.
+ *
+ */
+public interface WindowsStore extends Serializable {
+
+ /**
+ * This can be used as a separator while generating a key from sequence of strings.
+ */
+ public static final String KEY_SEPARATOR = "|";
+
+ public Object get(String key);
+
+ public Iterable<Object> get(List<String> keys);
+
+ public Iterable<String> getAllKeys();
+
+ public void put(String key, Object value);
+
+ public void putAll(Collection<Entry> entries);
+
+ public void remove(String key);
+
+ public void removeAll(Collection<String> keys);
+
+ public void shutdown();
+
+ /**
+ * This class wraps key and value objects which can be passed to {@code putAll} method.
+ */
+ public static class Entry implements Serializable {
+ public final String key;
+ public final Object value;
+
+ public Entry(String key, Object value) {
+ nonNullCheckForKey(key);
+ nonNullCheckForValue(value);
+ this.key = key;
+ this.value = value;
+ }
+
+ public static void nonNullCheckForKey(Object key) {
+ Preconditions.checkArgument(key != null, "key argument can not be null");
+ }
+
+ public static void nonNullCheckForValue(Object value) {
+ Preconditions.checkArgument(value != null, "value argument can not be null");
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStoreFactory.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStoreFactory.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStoreFactory.java
new file mode 100644
index 0000000..409d672
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStoreFactory.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.windowing;
+
+import java.io.Serializable;
+
+/**
+ * Factory to create instances of {@code WindowsStore}.
+ *
+ */
+public interface WindowsStoreFactory extends Serializable {
+
+ /**
+ * Creates a window store
+ *
+ * @return
+ */
+ public WindowsStore create();
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/windowing/config/BaseWindowConfig.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/config/BaseWindowConfig.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/config/BaseWindowConfig.java
new file mode 100644
index 0000000..8f9ef1a
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/config/BaseWindowConfig.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.windowing.config;
+
+/**
+ *
+ */
+public abstract class BaseWindowConfig implements WindowConfig {
+ protected final int windowLength;
+ protected final int slideLength;
+
+ protected BaseWindowConfig(int windowLength, int slideLength) {
+ this.windowLength = windowLength;
+ this.slideLength = slideLength;
+ }
+
+ @Override
+ public int getWindowLength() {
+ return windowLength;
+ }
+
+ @Override
+ public int getSlidingLength() {
+ return slideLength;
+ }
+
+ public void validate() {
+ if (slideLength > windowLength) {
+ throw new IllegalArgumentException("slideLength '" + slideLength + "' should always be less than windowLegth '" + windowLength + "'");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/windowing/config/SlidingCountWindow.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/config/SlidingCountWindow.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/config/SlidingCountWindow.java
new file mode 100644
index 0000000..a0dd13c
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/config/SlidingCountWindow.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.windowing.config;
+
+/**
+ * Represents configuration of sliding window based on count of events. Window of length {@code windowLength} slides
+ * at every count of given {@code slideLength}
+ *
+ */
+public final class SlidingCountWindow extends BaseWindowConfig {
+
+ private SlidingCountWindow(int windowLength, int slideLength) {
+ super(windowLength, slideLength);
+ }
+
+ @Override
+ public Type getWindowType() {
+ return Type.SLIDING_COUNT;
+ }
+
+ public static SlidingCountWindow of(int windowCount, int slidingCount) {
+ return new SlidingCountWindow(windowCount, slidingCount);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/windowing/config/SlidingDurationWindow.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/config/SlidingDurationWindow.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/config/SlidingDurationWindow.java
new file mode 100644
index 0000000..f2fe291
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/config/SlidingDurationWindow.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.windowing.config;
+
+import org.apache.storm.topology.base.BaseWindowedBolt;
+
+/**
+ * Represents configuration of sliding window based on duration. Window duration of {@code windowLength} slides
+ * at every {@code slideLength} interval.
+ *
+ */
+public final class SlidingDurationWindow extends BaseWindowConfig {
+
+ private SlidingDurationWindow(int windowLength, int slideLength) {
+ super(windowLength, slideLength);
+ }
+
+ @Override
+ public Type getWindowType() {
+ return Type.SLIDING_DURATION;
+ }
+
+ public static SlidingDurationWindow of(BaseWindowedBolt.Duration windowDuration, BaseWindowedBolt.Duration slidingDuration) {
+ return new SlidingDurationWindow(windowDuration.value, slidingDuration.value);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/windowing/config/TumblingCountWindow.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/config/TumblingCountWindow.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/config/TumblingCountWindow.java
new file mode 100644
index 0000000..a5f3528
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/config/TumblingCountWindow.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.windowing.config;
+
+/**
+ * Represents tumbling count window configuration. Window tumbles at each given {@code windowLength} count of events.
+ */
+public final class TumblingCountWindow extends BaseWindowConfig {
+
+ private TumblingCountWindow(int windowLength) {
+ super(windowLength, windowLength);
+ }
+
+ @Override
+ public Type getWindowType() {
+ return Type.TUMBLING_COUNT;
+ }
+
+ public static TumblingCountWindow of(int windowLength) {
+ return new TumblingCountWindow(windowLength);
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/windowing/config/TumblingDurationWindow.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/config/TumblingDurationWindow.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/config/TumblingDurationWindow.java
new file mode 100644
index 0000000..8beb68d
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/config/TumblingDurationWindow.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.windowing.config;
+
+import org.apache.storm.topology.base.BaseWindowedBolt;
+
+/**
+ * Represents tumbling duration window configuration. Window tumbles every given {@code windowLength} duration.
+ */
+public final class TumblingDurationWindow extends BaseWindowConfig {
+
+ private TumblingDurationWindow(int windowLength) {
+ super(windowLength, windowLength);
+ }
+
+ @Override
+ public Type getWindowType() {
+ return Type.TUMBLING_DURATION;
+ }
+
+ public static TumblingDurationWindow of(BaseWindowedBolt.Duration windowLength) {
+ return new TumblingDurationWindow(windowLength.value);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/windowing/config/WindowConfig.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/config/WindowConfig.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/config/WindowConfig.java
new file mode 100644
index 0000000..49347e7
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/config/WindowConfig.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.windowing.config;
+
+import java.io.Serializable;
+
+/**
+ * Windowing configuration with window and sliding length.
+ */
+public interface WindowConfig extends Serializable {
+
+ /**
+ * Returns the length of the window.
+ * @return
+ */
+ public int getWindowLength();
+
+ /**
+ * Returns the sliding length of the moving window.
+ * @return
+ */
+ public int getSlidingLength();
+
+ /**
+ * Gives the type of windowing. It can be any of {@code Type} values.
+ *
+ * @return
+ */
+ public Type getWindowType();
+
+ public void validate();
+
+ public enum Type {
+ SLIDING_COUNT,
+ TUMBLING_COUNT,
+ SLIDING_DURATION,
+ TUMBLING_DURATION
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/BaseWindowStrategy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/BaseWindowStrategy.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/BaseWindowStrategy.java
new file mode 100644
index 0000000..ed9befa
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/BaseWindowStrategy.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.windowing.strategy;
+
+import org.apache.storm.trident.windowing.config.WindowConfig;
+
+/**
+ *
+ */
+public abstract class BaseWindowStrategy<T> implements WindowStrategy<T> {
+ protected final WindowConfig windowConfig;
+
+ public BaseWindowStrategy(WindowConfig windowConfig) {
+ this.windowConfig = windowConfig;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/SlidingCountWindowStrategy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/SlidingCountWindowStrategy.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/SlidingCountWindowStrategy.java
new file mode 100644
index 0000000..c26b795
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/SlidingCountWindowStrategy.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.windowing.strategy;
+
+import org.apache.storm.trident.windowing.config.WindowConfig;
+import org.apache.storm.windowing.CountEvictionPolicy;
+import org.apache.storm.windowing.CountTriggerPolicy;
+import org.apache.storm.windowing.EvictionPolicy;
+import org.apache.storm.windowing.TriggerHandler;
+import org.apache.storm.windowing.TriggerPolicy;
+
+/**
+ * This class represents sliding window strategy based on the sliding window count and sliding interval count from the
+ * given {@code slidingCountWindow} configuration.
+ */
+public class SlidingCountWindowStrategy<T> extends BaseWindowStrategy<T> {
+
+ public SlidingCountWindowStrategy(WindowConfig slidingCountWindow) {
+ super(slidingCountWindow);
+ }
+
+ /**
+ * Returns a {@code TriggerPolicy} which triggers for every count of given sliding window.
+ *
+ * @param triggerHandler
+ * @param evictionPolicy
+ * @return
+ */
+ @Override
+ public TriggerPolicy<T> getTriggerPolicy(TriggerHandler triggerHandler, EvictionPolicy<T> evictionPolicy) {
+ return new CountTriggerPolicy<>(windowConfig.getSlidingLength(), triggerHandler, evictionPolicy);
+ }
+
+ /**
+ * Returns an {@code EvictionPolicy} instance which evicts elements after a count of given window length.
+ *
+ * @return
+ */
+ @Override
+ public EvictionPolicy<T> getEvictionPolicy() {
+ return new CountEvictionPolicy<>(windowConfig.getWindowLength());
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/SlidingDurationWindowStrategy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/SlidingDurationWindowStrategy.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/SlidingDurationWindowStrategy.java
new file mode 100644
index 0000000..9e71220
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/SlidingDurationWindowStrategy.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.windowing.strategy;
+
+import org.apache.storm.trident.windowing.config.WindowConfig;
+import org.apache.storm.windowing.EvictionPolicy;
+import org.apache.storm.windowing.TimeEvictionPolicy;
+import org.apache.storm.windowing.TimeTriggerPolicy;
+import org.apache.storm.windowing.TriggerHandler;
+import org.apache.storm.windowing.TriggerPolicy;
+
+/**
+ * This class represents sliding window strategy based on the sliding window duration and sliding interval from the
+ * given {@code slidingCountWindow} configuration.
+ *
+ **/
+public final class SlidingDurationWindowStrategy<T> extends BaseWindowStrategy<T> {
+
+ public SlidingDurationWindowStrategy(WindowConfig slidingDurationWindow) {
+ super(slidingDurationWindow);
+ }
+
+ /**
+ * Returns a {@code TriggerPolicy} which triggers for every configured sliding window duration.
+ *
+ * @param triggerHandler
+ * @param evictionPolicy
+ * @return
+ */
+ @Override
+ public TriggerPolicy<T> getTriggerPolicy(TriggerHandler triggerHandler, EvictionPolicy<T> evictionPolicy) {
+ return new TimeTriggerPolicy<>(windowConfig.getSlidingLength(), triggerHandler, evictionPolicy);
+ }
+
+ /**
+ * Returns an {@code EvictionPolicy} instance which evicts elements after window duration is reached.
+ *
+ * @return
+ */
+ @Override
+ public EvictionPolicy<T> getEvictionPolicy() {
+ return new TimeEvictionPolicy<>(windowConfig.getWindowLength());
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/TumblingCountWindowStrategy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/TumblingCountWindowStrategy.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/TumblingCountWindowStrategy.java
new file mode 100644
index 0000000..5e4d6fe
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/TumblingCountWindowStrategy.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.windowing.strategy;
+
+import org.apache.storm.trident.windowing.config.WindowConfig;
+import org.apache.storm.windowing.CountEvictionPolicy;
+import org.apache.storm.windowing.CountTriggerPolicy;
+import org.apache.storm.windowing.EvictionPolicy;
+import org.apache.storm.windowing.TriggerHandler;
+import org.apache.storm.windowing.TriggerPolicy;
+
+/**
+ * This class represents tumbling window strategy based on the window count from the
+ * given {@code slidingCountWindow} configuration. In this strategy , window and sliding lengths are equal.
+ *
+ */
+public final class TumblingCountWindowStrategy<T> extends BaseWindowStrategy<T> {
+
+ public TumblingCountWindowStrategy(WindowConfig tumblingCountWindow) {
+ super(tumblingCountWindow);
+ }
+
+ /**
+ * Returns a {@code TriggerPolicy} which triggers for every count of given sliding window.
+
+ * @param triggerHandler
+ * @param evictionPolicy
+ * @return
+ */
+ @Override
+ public TriggerPolicy<T> getTriggerPolicy(TriggerHandler triggerHandler, EvictionPolicy<T> evictionPolicy) {
+ return new CountTriggerPolicy<>(windowConfig.getSlidingLength(), triggerHandler, evictionPolicy);
+ }
+
+ /**
+ * Returns an {@code EvictionPolicy} instance which evicts elements after a count of given window length.
+ *
+ * @return
+ */
+ @Override
+ public EvictionPolicy<T> getEvictionPolicy() {
+ return new CountEvictionPolicy<>(windowConfig.getWindowLength());
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/TumblingDurationWindowStrategy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/TumblingDurationWindowStrategy.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/TumblingDurationWindowStrategy.java
new file mode 100644
index 0000000..4478667
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/TumblingDurationWindowStrategy.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.windowing.strategy;
+
+import org.apache.storm.trident.windowing.config.WindowConfig;
+import org.apache.storm.windowing.EvictionPolicy;
+import org.apache.storm.windowing.TimeEvictionPolicy;
+import org.apache.storm.windowing.TimeTriggerPolicy;
+import org.apache.storm.windowing.TriggerHandler;
+import org.apache.storm.windowing.TriggerPolicy;
+
+/**
+ * This class represents tumbling window strategy based on the window duration from the
+ * given {@code slidingCountWindow} configuration. In this strategy , window and sliding durations are equal.
+ *
+ */
+public final class TumblingDurationWindowStrategy<T> extends BaseWindowStrategy<T> {
+
+ public TumblingDurationWindowStrategy(WindowConfig tumblingDurationWindow) {
+ super(tumblingDurationWindow);
+ }
+
+ /**
+ * Returns a {@code TriggerPolicy} which triggers for every given sliding duration.
+ *
+ * @param triggerHandler
+ * @param evictionPolicy
+ * @return
+ */
+ @Override
+ public TriggerPolicy<T> getTriggerPolicy(TriggerHandler triggerHandler, EvictionPolicy<T> evictionPolicy) {
+ return new TimeTriggerPolicy<>(windowConfig.getSlidingLength(), triggerHandler, evictionPolicy);
+ }
+
+ /**
+ * Returns an {@code EvictionPolicy} instance which evicts elements after given window duration.
+ *
+ * @return
+ */
+ @Override
+ public EvictionPolicy<T> getEvictionPolicy() {
+ return new TimeEvictionPolicy<>(windowConfig.getWindowLength());
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/WindowStrategy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/WindowStrategy.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/WindowStrategy.java
new file mode 100644
index 0000000..1dfb264
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/WindowStrategy.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.windowing.strategy;
+
+import org.apache.storm.windowing.EvictionPolicy;
+import org.apache.storm.windowing.TriggerHandler;
+import org.apache.storm.windowing.TriggerPolicy;
+/**
+ * Strategy for windowing which will have respective trigger and eviction policies.
+ */
+public interface WindowStrategy<T> {
+
+ /**
+ * Returns a {@code TriggerPolicy} by creating with {@code triggerHandler} and {@code evictionPolicy} with
+ * the given configuration.
+ *
+ * @param triggerHandler
+ * @param evictionPolicy
+ * @return
+ */
+ public TriggerPolicy<T> getTriggerPolicy(TriggerHandler triggerHandler, EvictionPolicy<T> evictionPolicy);
+
+ /**
+ * Returns an {@code EvictionPolicy} instance for this strategy with the given configuration.
+ *
+ * @return
+ */
+ public EvictionPolicy<T> getEvictionPolicy();
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/WindowStrategyFactory.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/WindowStrategyFactory.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/WindowStrategyFactory.java
new file mode 100644
index 0000000..d8a3918
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/WindowStrategyFactory.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.windowing.strategy;
+
+import org.apache.storm.trident.windowing.config.WindowConfig;
+
+/**
+ *
+ */
+public final class WindowStrategyFactory {
+
+ private WindowStrategyFactory() {
+ }
+
+ /**
+ * Creates a {@code WindowStrategy} instance based on the given {@code windowConfig}.
+ *
+ * @param windowConfig
+ * @return
+ */
+ public static <T> WindowStrategy<T> create(WindowConfig windowConfig) {
+ WindowStrategy<T> windowStrategy = null;
+ WindowConfig.Type windowType = windowConfig.getWindowType();
+ switch(windowType) {
+ case SLIDING_COUNT:
+ windowStrategy = new SlidingCountWindowStrategy<>(windowConfig);
+ break;
+ case TUMBLING_COUNT:
+ windowStrategy = new TumblingCountWindowStrategy<>(windowConfig);
+ break;
+ case SLIDING_DURATION:
+ windowStrategy = new SlidingDurationWindowStrategy<>(windowConfig);
+ break;
+ case TUMBLING_DURATION:
+ windowStrategy = new TumblingDurationWindowStrategy<>(windowConfig);
+ break;
+ default:
+ throw new IllegalArgumentException("Given WindowConfig of type "+windowType+" is not supported");
+ }
+
+ return windowStrategy;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/windowing/TriggerHandler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/windowing/TriggerHandler.java b/storm-core/src/jvm/org/apache/storm/windowing/TriggerHandler.java
index eef4968..ac1fa1c 100644
--- a/storm-core/src/jvm/org/apache/storm/windowing/TriggerHandler.java
+++ b/storm-core/src/jvm/org/apache/storm/windowing/TriggerHandler.java
@@ -21,7 +21,7 @@ package org.apache.storm.windowing;
* The callback fired by {@link TriggerPolicy} when the trigger
* condition is satisfied.
*/
-interface TriggerHandler {
+public interface TriggerHandler {
/**
* The code to execute when the {@link TriggerPolicy} condition is satisfied.
*
http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/test/jvm/org/apache/storm/trident/TridentWindowingTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/trident/TridentWindowingTest.java b/storm-core/test/jvm/org/apache/storm/trident/TridentWindowingTest.java
new file mode 100644
index 0000000..03f298d
--- /dev/null
+++ b/storm-core/test/jvm/org/apache/storm/trident/TridentWindowingTest.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident;
+
+import org.apache.storm.topology.base.BaseWindowedBolt;
+import org.apache.storm.trident.windowing.InMemoryWindowsStore;
+import org.apache.storm.trident.windowing.config.SlidingCountWindow;
+import org.apache.storm.trident.windowing.config.SlidingDurationWindow;
+import org.apache.storm.trident.windowing.config.TumblingCountWindow;
+import org.apache.storm.trident.windowing.config.TumblingDurationWindow;
+import org.apache.storm.trident.windowing.strategy.SlidingCountWindowStrategy;
+import org.apache.storm.trident.windowing.strategy.SlidingDurationWindowStrategy;
+import org.apache.storm.trident.windowing.strategy.TumblingCountWindowStrategy;
+import org.apache.storm.trident.windowing.strategy.TumblingDurationWindowStrategy;
+import org.apache.storm.trident.windowing.strategy.WindowStrategy;
+import org.apache.storm.trident.windowing.strategy.WindowStrategyFactory;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ *
+ */
+public class TridentWindowingTest {
+
+ @Test
+ public void testWindowStrategyInstances() throws Exception {
+
+ WindowStrategy<Object> tumblingCountStrategy = WindowStrategyFactory.create(TumblingCountWindow.of(10));
+ Assert.assertTrue(tumblingCountStrategy instanceof TumblingCountWindowStrategy);
+
+ WindowStrategy<Object> slidingCountStrategy = WindowStrategyFactory.create(SlidingCountWindow.of(100, 10));
+ Assert.assertTrue(slidingCountStrategy instanceof SlidingCountWindowStrategy);
+
+ WindowStrategy<Object> tumblingDurationStrategy = WindowStrategyFactory.create(
+ TumblingDurationWindow.of(new BaseWindowedBolt.Duration(10, TimeUnit.SECONDS)));
+ Assert.assertTrue(tumblingDurationStrategy instanceof TumblingDurationWindowStrategy);
+
+ WindowStrategy<Object> slidingDurationStrategy = WindowStrategyFactory.create(
+ SlidingDurationWindow.of(new BaseWindowedBolt.Duration(10, TimeUnit.SECONDS),
+ new BaseWindowedBolt.Duration(2, TimeUnit.SECONDS)));
+ Assert.assertTrue(slidingDurationStrategy instanceof SlidingDurationWindowStrategy);
+ }
+
+ @Test
+ public void testWindowConfig() {
+ int windowLength = 9;
+ TumblingCountWindow tumblingCountWindow = TumblingCountWindow.of(windowLength);
+ Assert.assertTrue(tumblingCountWindow.getWindowLength() == windowLength);
+ Assert.assertTrue(tumblingCountWindow.getSlidingLength() == windowLength);
+
+ windowLength = 10;
+ int slidingLength = 2;
+ SlidingCountWindow slidingCountWindow = SlidingCountWindow.of(10, 2);
+ Assert.assertTrue(slidingCountWindow.getWindowLength() == windowLength);
+ Assert.assertTrue(slidingCountWindow.getSlidingLength() == slidingLength);
+
+ windowLength = 20;
+ TumblingDurationWindow tumblingDurationWindow = TumblingDurationWindow.of(new BaseWindowedBolt.Duration(windowLength, TimeUnit.SECONDS));
+ Assert.assertTrue(tumblingDurationWindow.getWindowLength() == windowLength*1000);
+ Assert.assertTrue(tumblingDurationWindow.getSlidingLength() == windowLength*1000);
+
+ windowLength = 50;
+ slidingLength = 10;
+ SlidingDurationWindow slidingDurationWindow = SlidingDurationWindow.of(new BaseWindowedBolt.Duration(windowLength, TimeUnit.SECONDS),
+ new BaseWindowedBolt.Duration(slidingLength, TimeUnit.SECONDS));
+ Assert.assertTrue(slidingDurationWindow.getWindowLength() == windowLength*1000);
+ Assert.assertTrue(slidingDurationWindow.getSlidingLength() == slidingLength*1000);
+ }
+
+ @Test
+ public void testInMemoryWindowStore() {
+ InMemoryWindowsStore store = new InMemoryWindowsStore();
+ String keyPrefix = "key";
+ String valuePrefix = "valuePrefix";
+
+ int ct = 10;
+ for (int i=0; i<ct; i++) {
+ store.put(keyPrefix +i, valuePrefix +i);
+ }
+
+ for (int i=0; i<ct; i++) {
+ Assert.assertTrue((valuePrefix + i).equals(store.get(keyPrefix + i)));
+ }
+
+ store.remove(keyPrefix+1);
+ Assert.assertNull(store.get(keyPrefix+1));
+
+ }
+
+}
\ No newline at end of file
[7/9] storm git commit: STORM-676 Refactoring of WindowConfig APIs
Posted by sr...@apache.org.
STORM-676 Refactoring of WindowConfig APIs
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/8c263c7c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/8c263c7c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/8c263c7c
Branch: refs/heads/1.x-branch
Commit: 8c263c7c08a3ebd11a1d5df4996b7e12422cd721
Parents: b08d7ea
Author: Satish Duggana <sd...@hortonworks.com>
Authored: Wed Mar 23 22:35:08 2016 +0530
Committer: Satish Duggana <sd...@hortonworks.com>
Committed: Sun Mar 27 10:47:14 2016 +0530
----------------------------------------------------------------------
.../windowing/AbstractTridentWindowManager.java | 4 +-
.../windowing/config/SlidingCountWindow.java | 7 ++-
.../windowing/config/SlidingDurationWindow.java | 6 +-
.../windowing/config/TumblingCountWindow.java | 8 ++-
.../config/TumblingDurationWindow.java | 6 +-
.../trident/windowing/config/WindowConfig.java | 4 +-
.../strategy/WindowStrategyFactory.java | 60 --------------------
.../storm/trident/TridentWindowingTest.java | 25 ++++----
8 files changed, 33 insertions(+), 87 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/8c263c7c/storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java
index aac18d3..f93527a 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java
@@ -25,7 +25,6 @@ import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.tuple.TridentTuple;
import org.apache.storm.trident.windowing.config.WindowConfig;
import org.apache.storm.trident.windowing.strategy.WindowStrategy;
-import org.apache.storm.trident.windowing.strategy.WindowStrategyFactory;
import org.apache.storm.windowing.EvictionPolicy;
import org.apache.storm.windowing.TriggerPolicy;
import org.apache.storm.windowing.WindowLifecycleListener;
@@ -34,7 +33,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
-import java.util.HashSet;
import java.util.List;
import java.util.Queue;
import java.util.Set;
@@ -71,7 +69,7 @@ public abstract class AbstractTridentWindowManager<T> implements ITridentWindowM
windowManager = new WindowManager<>(new TridentWindowLifeCycleListener());
- WindowStrategy<T> windowStrategy = WindowStrategyFactory.create(windowConfig);
+ WindowStrategy<T> windowStrategy = windowConfig.getWindowStrategy();
EvictionPolicy<T> evictionPolicy = windowStrategy.getEvictionPolicy();
windowManager.setEvictionPolicy(evictionPolicy);
triggerPolicy = windowStrategy.getTriggerPolicy(windowManager, evictionPolicy);
http://git-wip-us.apache.org/repos/asf/storm/blob/8c263c7c/storm-core/src/jvm/org/apache/storm/trident/windowing/config/SlidingCountWindow.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/config/SlidingCountWindow.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/config/SlidingCountWindow.java
index a0dd13c..2e2d388 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/windowing/config/SlidingCountWindow.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/config/SlidingCountWindow.java
@@ -18,6 +18,9 @@
*/
package org.apache.storm.trident.windowing.config;
+import org.apache.storm.trident.windowing.strategy.SlidingCountWindowStrategy;
+import org.apache.storm.trident.windowing.strategy.WindowStrategy;
+
/**
* Represents configuration of sliding window based on count of events. Window of length {@code windowLength} slides
* at every count of given {@code slideLength}
@@ -30,8 +33,8 @@ public final class SlidingCountWindow extends BaseWindowConfig {
}
@Override
- public Type getWindowType() {
- return Type.SLIDING_COUNT;
+ public <T> WindowStrategy<T> getWindowStrategy() {
+ return new SlidingCountWindowStrategy<>(this);
}
public static SlidingCountWindow of(int windowCount, int slidingCount) {
http://git-wip-us.apache.org/repos/asf/storm/blob/8c263c7c/storm-core/src/jvm/org/apache/storm/trident/windowing/config/SlidingDurationWindow.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/config/SlidingDurationWindow.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/config/SlidingDurationWindow.java
index f2fe291..befd4e3 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/windowing/config/SlidingDurationWindow.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/config/SlidingDurationWindow.java
@@ -19,6 +19,8 @@
package org.apache.storm.trident.windowing.config;
import org.apache.storm.topology.base.BaseWindowedBolt;
+import org.apache.storm.trident.windowing.strategy.SlidingDurationWindowStrategy;
+import org.apache.storm.trident.windowing.strategy.WindowStrategy;
/**
* Represents configuration of sliding window based on duration. Window duration of {@code windowLength} slides
@@ -32,8 +34,8 @@ public final class SlidingDurationWindow extends BaseWindowConfig {
}
@Override
- public Type getWindowType() {
- return Type.SLIDING_DURATION;
+ public <T> WindowStrategy<T> getWindowStrategy() {
+ return new SlidingDurationWindowStrategy<>(this);
}
public static SlidingDurationWindow of(BaseWindowedBolt.Duration windowDuration, BaseWindowedBolt.Duration slidingDuration) {
http://git-wip-us.apache.org/repos/asf/storm/blob/8c263c7c/storm-core/src/jvm/org/apache/storm/trident/windowing/config/TumblingCountWindow.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/config/TumblingCountWindow.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/config/TumblingCountWindow.java
index a5f3528..1988850 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/windowing/config/TumblingCountWindow.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/config/TumblingCountWindow.java
@@ -18,6 +18,10 @@
*/
package org.apache.storm.trident.windowing.config;
+import org.apache.storm.trident.windowing.strategy.SlidingDurationWindowStrategy;
+import org.apache.storm.trident.windowing.strategy.TumblingCountWindowStrategy;
+import org.apache.storm.trident.windowing.strategy.WindowStrategy;
+
/**
* Represents tumbling count window configuration. Window tumbles at each given {@code windowLength} count of events.
*/
@@ -28,8 +32,8 @@ public final class TumblingCountWindow extends BaseWindowConfig {
}
@Override
- public Type getWindowType() {
- return Type.TUMBLING_COUNT;
+ public <T> WindowStrategy<T> getWindowStrategy() {
+ return new TumblingCountWindowStrategy<>(this);
}
public static TumblingCountWindow of(int windowLength) {
http://git-wip-us.apache.org/repos/asf/storm/blob/8c263c7c/storm-core/src/jvm/org/apache/storm/trident/windowing/config/TumblingDurationWindow.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/config/TumblingDurationWindow.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/config/TumblingDurationWindow.java
index 8beb68d..3881a74 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/windowing/config/TumblingDurationWindow.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/config/TumblingDurationWindow.java
@@ -19,6 +19,8 @@
package org.apache.storm.trident.windowing.config;
import org.apache.storm.topology.base.BaseWindowedBolt;
+import org.apache.storm.trident.windowing.strategy.TumblingDurationWindowStrategy;
+import org.apache.storm.trident.windowing.strategy.WindowStrategy;
/**
* Represents tumbling duration window configuration. Window tumbles every given {@code windowLength} duration.
@@ -30,8 +32,8 @@ public final class TumblingDurationWindow extends BaseWindowConfig {
}
@Override
- public Type getWindowType() {
- return Type.TUMBLING_DURATION;
+ public <T> WindowStrategy<T> getWindowStrategy() {
+ return new TumblingDurationWindowStrategy<>(this);
}
public static TumblingDurationWindow of(BaseWindowedBolt.Duration windowLength) {
http://git-wip-us.apache.org/repos/asf/storm/blob/8c263c7c/storm-core/src/jvm/org/apache/storm/trident/windowing/config/WindowConfig.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/config/WindowConfig.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/config/WindowConfig.java
index 49347e7..7cb78ee 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/windowing/config/WindowConfig.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/config/WindowConfig.java
@@ -18,6 +18,8 @@
*/
package org.apache.storm.trident.windowing.config;
+import org.apache.storm.trident.windowing.strategy.WindowStrategy;
+
import java.io.Serializable;
/**
@@ -42,7 +44,7 @@ public interface WindowConfig extends Serializable {
*
* @return
*/
- public Type getWindowType();
+ public <T> WindowStrategy<T> getWindowStrategy();
public void validate();
http://git-wip-us.apache.org/repos/asf/storm/blob/8c263c7c/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/WindowStrategyFactory.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/WindowStrategyFactory.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/WindowStrategyFactory.java
deleted file mode 100644
index d8a3918..0000000
--- a/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/WindowStrategyFactory.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.storm.trident.windowing.strategy;
-
-import org.apache.storm.trident.windowing.config.WindowConfig;
-
-/**
- *
- */
-public final class WindowStrategyFactory {
-
- private WindowStrategyFactory() {
- }
-
- /**
- * Creates a {@code WindowStrategy} instance based on the given {@code windowConfig}.
- *
- * @param windowConfig
- * @return
- */
- public static <T> WindowStrategy<T> create(WindowConfig windowConfig) {
- WindowStrategy<T> windowStrategy = null;
- WindowConfig.Type windowType = windowConfig.getWindowType();
- switch(windowType) {
- case SLIDING_COUNT:
- windowStrategy = new SlidingCountWindowStrategy<>(windowConfig);
- break;
- case TUMBLING_COUNT:
- windowStrategy = new TumblingCountWindowStrategy<>(windowConfig);
- break;
- case SLIDING_DURATION:
- windowStrategy = new SlidingDurationWindowStrategy<>(windowConfig);
- break;
- case TUMBLING_DURATION:
- windowStrategy = new TumblingDurationWindowStrategy<>(windowConfig);
- break;
- default:
- throw new IllegalArgumentException("Given WindowConfig of type "+windowType+" is not supported");
- }
-
- return windowStrategy;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/8c263c7c/storm-core/test/jvm/org/apache/storm/trident/TridentWindowingTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/trident/TridentWindowingTest.java b/storm-core/test/jvm/org/apache/storm/trident/TridentWindowingTest.java
index 03f298d..4b82b89 100644
--- a/storm-core/test/jvm/org/apache/storm/trident/TridentWindowingTest.java
+++ b/storm-core/test/jvm/org/apache/storm/trident/TridentWindowingTest.java
@@ -24,15 +24,8 @@ import org.apache.storm.trident.windowing.config.SlidingCountWindow;
import org.apache.storm.trident.windowing.config.SlidingDurationWindow;
import org.apache.storm.trident.windowing.config.TumblingCountWindow;
import org.apache.storm.trident.windowing.config.TumblingDurationWindow;
-import org.apache.storm.trident.windowing.strategy.SlidingCountWindowStrategy;
-import org.apache.storm.trident.windowing.strategy.SlidingDurationWindowStrategy;
-import org.apache.storm.trident.windowing.strategy.TumblingCountWindowStrategy;
-import org.apache.storm.trident.windowing.strategy.TumblingDurationWindowStrategy;
-import org.apache.storm.trident.windowing.strategy.WindowStrategy;
-import org.apache.storm.trident.windowing.strategy.WindowStrategyFactory;
-import org.junit.After;
+import org.apache.storm.trident.windowing.strategy.*;
import org.junit.Assert;
-import org.junit.Before;
import org.junit.Test;
import java.util.concurrent.TimeUnit;
@@ -45,19 +38,21 @@ public class TridentWindowingTest {
@Test
public void testWindowStrategyInstances() throws Exception {
- WindowStrategy<Object> tumblingCountStrategy = WindowStrategyFactory.create(TumblingCountWindow.of(10));
+ WindowStrategy<Object> tumblingCountStrategy = TumblingCountWindow.of(10).getWindowStrategy();
Assert.assertTrue(tumblingCountStrategy instanceof TumblingCountWindowStrategy);
- WindowStrategy<Object> slidingCountStrategy = WindowStrategyFactory.create(SlidingCountWindow.of(100, 10));
+ WindowStrategy<Object> slidingCountStrategy = SlidingCountWindow.of(100, 10).getWindowStrategy();
Assert.assertTrue(slidingCountStrategy instanceof SlidingCountWindowStrategy);
- WindowStrategy<Object> tumblingDurationStrategy = WindowStrategyFactory.create(
- TumblingDurationWindow.of(new BaseWindowedBolt.Duration(10, TimeUnit.SECONDS)));
+ WindowStrategy<Object> tumblingDurationStrategy = TumblingDurationWindow.of(
+ new BaseWindowedBolt.Duration(10, TimeUnit.SECONDS))
+ .getWindowStrategy();
Assert.assertTrue(tumblingDurationStrategy instanceof TumblingDurationWindowStrategy);
- WindowStrategy<Object> slidingDurationStrategy = WindowStrategyFactory.create(
- SlidingDurationWindow.of(new BaseWindowedBolt.Duration(10, TimeUnit.SECONDS),
- new BaseWindowedBolt.Duration(2, TimeUnit.SECONDS)));
+ WindowStrategy<Object> slidingDurationStrategy = SlidingDurationWindow.of(
+ new BaseWindowedBolt.Duration(10, TimeUnit.SECONDS),
+ new BaseWindowedBolt.Duration(2, TimeUnit.SECONDS))
+ .getWindowStrategy();
Assert.assertTrue(slidingDurationStrategy instanceof SlidingDurationWindowStrategy);
}
[9/9] storm git commit: Added STORM-676 to CHANGELOG.
Posted by sr...@apache.org.
Added STORM-676 to CHANGELOG.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b0db246a
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b0db246a
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b0db246a
Branch: refs/heads/1.x-branch
Commit: b0db246add3d2ca6c9b23784c6a34a92252bb20b
Parents: b661bf8
Author: Sriharsha Chintalapani <ha...@hortonworks.com>
Authored: Sun Mar 27 23:06:28 2016 -0700
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Sun Mar 27 23:06:28 2016 -0700
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/b0db246a/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 06eb7be..ae99355 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
## 1.0.0
+ * STORM-676: Storm Trident support for sliding/tumbling windows
* STORM-1630: Add guide page for Windows users
* STORM-1655: Flux doesn't set return code to non-zero when there's any exception while deploying topology to remote cluster
* STORM-1654: HBaseBolt creates tick tuples with no interval when we don't set flushIntervalSecs
[2/9] storm git commit: STORM-676 Upmerged and resolved conflicts
Posted by sr...@apache.org.
STORM-676 Upmerged and resolved conflicts
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/89c03b83
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/89c03b83
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/89c03b83
Branch: refs/heads/1.x-branch
Commit: 89c03b83e990aa7fbc732caf055e6bd09e2fc479
Parents: 139a8a3
Author: Satish Duggana <sd...@hortonworks.com>
Authored: Wed Mar 23 12:20:21 2016 +0530
Committer: Satish Duggana <sd...@hortonworks.com>
Committed: Sun Mar 27 10:45:52 2016 +0530
----------------------------------------------------------------------
examples/storm-starter/pom.xml | 44 ++-
.../TridentHBaseWindowingStoreTopology.java | 124 +++++++++
.../TridentWindowingInmemoryStoreTopology.java | 134 +++++++++
.../trident/windowing/HBaseWindowsStore.java | 275 +++++++++++++++++++
.../windowing/HBaseWindowsStoreFactory.java | 51 ++++
storm-core/src/jvm/org/apache/storm/Config.java | 8 +
.../jvm/org/apache/storm/trident/Stream.java | 223 +++++++++++++--
.../apache/storm/trident/TridentTopology.java | 4 +
.../storm/trident/fluent/UniqueIdGen.java | 14 +-
.../storm/trident/operation/builtin/Debug.java | 4 +-
.../windowing/AbstractTridentWindowManager.java | 241 ++++++++++++++++
.../windowing/ITridentWindowManager.java | 59 ++++
.../windowing/InMemoryTridentWindowManager.java | 78 ++++++
.../trident/windowing/InMemoryWindowsStore.java | 200 ++++++++++++++
.../windowing/InMemoryWindowsStoreFactory.java | 37 +++
.../StoreBasedTridentWindowManager.java | 223 +++++++++++++++
.../trident/windowing/TridentBatchTuple.java | 42 +++
.../windowing/WindowTridentProcessor.java | 260 ++++++++++++++++++
.../storm/trident/windowing/WindowsState.java | 52 ++++
.../trident/windowing/WindowsStateFactory.java | 40 +++
.../trident/windowing/WindowsStateUpdater.java | 81 ++++++
.../storm/trident/windowing/WindowsStore.java | 78 ++++++
.../trident/windowing/WindowsStoreFactory.java | 35 +++
.../windowing/config/BaseWindowConfig.java | 48 ++++
.../windowing/config/SlidingCountWindow.java | 40 +++
.../windowing/config/SlidingDurationWindow.java | 42 +++
.../windowing/config/TumblingCountWindow.java | 39 +++
.../config/TumblingDurationWindow.java | 40 +++
.../trident/windowing/config/WindowConfig.java | 55 ++++
.../windowing/strategy/BaseWindowStrategy.java | 32 +++
.../strategy/SlidingCountWindowStrategy.java | 59 ++++
.../strategy/SlidingDurationWindowStrategy.java | 60 ++++
.../strategy/TumblingCountWindowStrategy.java | 60 ++++
.../TumblingDurationWindowStrategy.java | 60 ++++
.../windowing/strategy/WindowStrategy.java | 45 +++
.../strategy/WindowStrategyFactory.java | 60 ++++
.../apache/storm/windowing/TriggerHandler.java | 2 +-
.../storm/trident/TridentWindowingTest.java | 110 ++++++++
38 files changed, 3019 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/examples/storm-starter/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-starter/pom.xml b/examples/storm-starter/pom.xml
index a44e14c..e702a5d 100644
--- a/examples/storm-starter/pom.xml
+++ b/examples/storm-starter/pom.xml
@@ -31,10 +31,13 @@
<name>storm-starter</name>
<properties>
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <!-- see comment below... This fixes an annoyance with intellij -->
- <provided.scope>provided</provided.scope>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <!-- see comment below... This fixes an annoyance with intellij -->
+ <provided.scope>provided</provided.scope>
+ <hbase.version>0.98.4-hadoop2</hbase.version>
+ <hbase.version>1.1.2</hbase.version>
</properties>
+
<profiles>
<profile>
<id>intellij</id>
@@ -139,6 +142,11 @@
<artifactId>storm-hdfs</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-hbase</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
@@ -167,6 +175,36 @@
<artifactId>storm-redis</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-server</artifactId>
+ <version>${hbase.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-client</artifactId>
+ <version>${hbase.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java
new file mode 100644
index 0000000..24cc8d9
--- /dev/null
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.starter.trident;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.hbase.trident.windowing.HBaseWindowsStoreFactory;
+import org.apache.storm.topology.base.BaseWindowedBolt;
+import org.apache.storm.trident.Stream;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.operation.BaseFunction;
+import org.apache.storm.trident.operation.Function;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.operation.TridentOperationContext;
+import org.apache.storm.trident.operation.builtin.Debug;
+import org.apache.storm.trident.testing.CountAsAggregator;
+import org.apache.storm.trident.testing.FixedBatchSpout;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.apache.storm.trident.windowing.WindowsStoreFactory;
+import org.apache.storm.trident.windowing.config.TumblingCountWindow;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ *
+ */
+public class TridentHBaseWindowingStoreTopology {
+ private static final Logger log = LoggerFactory.getLogger(TridentHBaseWindowingStoreTopology.class);
+
+ public static StormTopology buildTopology(WindowsStoreFactory windowsStore) throws Exception {
+ FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3, new Values("the cow jumped over the moon"),
+ new Values("the man went to the store and bought some candy"), new Values("four score and seven years ago"),
+ new Values("how many apples can you eat"), new Values("to be or not to be the person"));
+ spout.setCycle(true);
+
+ TridentTopology topology = new TridentTopology();
+
+ Stream stream = topology.newStream("spout1", spout).parallelismHint(16).each(new Fields("sentence"),
+ new Split(), new Fields("word"))
+ .window(TumblingCountWindow.of(1000), windowsStore, new Fields("word"), new CountAsAggregator(), new Fields("count"))
+// .tumblingTimeWindow(new BaseWindowedBolt.Duration(3, TimeUnit.SECONDS), windowsStore, new Fields("word"), new CountAsAggregator(), new Fields("count"))
+ .each(new Fields("count"), new Debug())
+ .each(new Fields("count"), new Echo(), new Fields("ct"));
+
+ return topology.build();
+ }
+
+ public static class Split extends BaseFunction {
+ @Override
+ public void execute(TridentTuple tuple, TridentCollector collector) {
+ String sentence = tuple.getString(0);
+ for (String word : sentence.split(" ")) {
+ collector.emit(new Values(word));
+ }
+ }
+ }
+
+ static class Echo implements Function {
+
+ @Override
+ public void execute(TridentTuple tuple, TridentCollector collector) {
+ log.info("##########Echo.execute: " + tuple);
+ collector.emit(tuple.getValues());
+ }
+
+ @Override
+ public void prepare(Map conf, TridentOperationContext context) {
+
+ }
+
+ @Override
+ public void cleanup() {
+
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ Config conf = new Config();
+ conf.setMaxSpoutPending(20);
+ conf.put(Config.TOPOLOGY_TRIDENT_WINDOWING_INMEMORY_CACHE_LIMIT, 2);
+
+ // window-state table should already be created with cf:tuples column
+ HBaseWindowsStoreFactory windowStoreFactory = new HBaseWindowsStoreFactory(new HashMap<String, Object>(), "window-state", "cf".getBytes("UTF-8"), "tuples".getBytes("UTF-8"));
+
+ if (args.length == 0) {
+ LocalCluster cluster = new LocalCluster();
+ String topologyName = "wordCounterWithWindowing";
+ cluster.submitTopology(topologyName, conf, buildTopology(windowStoreFactory));
+ Utils.sleep(120 * 1000);
+ cluster.killTopology(topologyName);
+ cluster.shutdown();
+ System.exit(0);
+ } else {
+ conf.setNumWorkers(3);
+ StormSubmitter.submitTopologyWithProgressBar(args[0], conf, buildTopology(windowStoreFactory));
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java
new file mode 100644
index 0000000..5f0cb4f
--- /dev/null
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.starter.trident;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.topology.base.BaseWindowedBolt;
+import org.apache.storm.trident.Stream;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.operation.BaseAggregator;
+import org.apache.storm.trident.operation.BaseFunction;
+import org.apache.storm.trident.operation.Function;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.operation.TridentOperationContext;
+import org.apache.storm.trident.operation.builtin.Debug;
+import org.apache.storm.trident.testing.CountAsAggregator;
+import org.apache.storm.trident.testing.FixedBatchSpout;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.apache.storm.trident.windowing.InMemoryWindowsStoreFactory;
+import org.apache.storm.trident.windowing.WindowsStoreFactory;
+import org.apache.storm.trident.windowing.config.SlidingCountWindow;
+import org.apache.storm.trident.windowing.config.SlidingDurationWindow;
+import org.apache.storm.trident.windowing.config.TumblingCountWindow;
+import org.apache.storm.trident.windowing.config.TumblingDurationWindow;
+import org.apache.storm.trident.windowing.config.WindowConfig;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ *
+ */
+public class TridentWindowingInmemoryStoreTopology {
+ private static final Logger log = LoggerFactory.getLogger(TridentWindowingInmemoryStoreTopology.class);
+
+ public static StormTopology buildTopology(WindowsStoreFactory windowStore, WindowConfig windowConfig) throws Exception {
+ FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3, new Values("the cow jumped over the moon"),
+ new Values("the man went to the store and bought some candy"), new Values("four score and seven years ago"),
+ new Values("how many apples can you eat"), new Values("to be or not to be the person"));
+ spout.setCycle(true);
+
+ TridentTopology topology = new TridentTopology();
+
+ Stream stream = topology.newStream("spout1", spout).parallelismHint(16).each(new Fields("sentence"),
+ new Split(), new Fields("word"))
+ .window(windowConfig, windowStore, new Fields("word"), new CountAsAggregator(), new Fields("count"))
+// .aggregate(new CountAsAggregator(), new Fields("count"))
+ .each(new Fields("count"), new Debug())
+ .each(new Fields("count"), new Echo(), new Fields("ct"))
+ .each(new Fields("ct"), new Debug());
+
+ return topology.build();
+ }
+
+ public static class Split extends BaseFunction {
+ @Override
+ public void execute(TridentTuple tuple, TridentCollector collector) {
+ String sentence = tuple.getString(0);
+ for (String word : sentence.split(" ")) {
+ collector.emit(new Values(word));
+ }
+ }
+ }
+
+ static class Echo implements Function {
+
+ @Override
+ public void execute(TridentTuple tuple, TridentCollector collector) {
+ log.info("##########Echo.execute: " + tuple);
+ collector.emit(tuple.getValues());
+ }
+
+ @Override
+ public void prepare(Map conf, TridentOperationContext context) {
+
+ }
+
+ @Override
+ public void cleanup() {
+
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ Config conf = new Config();
+ WindowsStoreFactory mapState = new InMemoryWindowsStoreFactory();
+
+ if (args.length == 0) {
+ List<? extends WindowConfig> list = Arrays.asList(
+ SlidingCountWindow.of(1000, 100)
+ ,TumblingCountWindow.of(1000)
+ ,SlidingDurationWindow.of(new BaseWindowedBolt.Duration(6, TimeUnit.SECONDS), new BaseWindowedBolt.Duration(3, TimeUnit.SECONDS))
+ ,TumblingDurationWindow.of(new BaseWindowedBolt.Duration(3, TimeUnit.SECONDS))
+ );
+
+ for (WindowConfig windowConfig : list) {
+ LocalCluster cluster = new LocalCluster();
+ cluster.submitTopology("wordCounter", conf, buildTopology(mapState, windowConfig));
+ Utils.sleep(60 * 1000);
+ cluster.shutdown();
+ }
+ System.exit(0);
+ } else {
+ conf.setNumWorkers(3);
+ StormSubmitter.submitTopologyWithProgressBar(args[0], conf, buildTopology(mapState, SlidingCountWindow.of(1000, 100)));
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java
----------------------------------------------------------------------
diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java
new file mode 100644
index 0000000..47879a4
--- /dev/null
+++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java
@@ -0,0 +1,275 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.hbase.trident.windowing;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
+import org.apache.storm.trident.windowing.WindowsStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * This class stores entries into hbase instance of the given configuration.
+ *
+ */
+public class HBaseWindowsStore implements WindowsStore {
+ private static final Logger log = LoggerFactory.getLogger(HBaseWindowsStore.class);
+ public static final String UTF_8 = "utf-8";
+
+ private final ThreadLocal<HTable> threadLocalHtable;
+ private Queue<HTable> htables = new ConcurrentLinkedQueue<>();
+ private final byte[] family;
+ private final byte[] qualifier;
+
+ public HBaseWindowsStore(final Configuration config, final String tableName, byte[] family, byte[] qualifier) {
+ this.family = family;
+ this.qualifier = qualifier;
+
+ threadLocalHtable = new ThreadLocal<HTable>() {
+ @Override
+ protected HTable initialValue() {
+ try {
+ HTable hTable = new HTable(config, tableName);
+ htables.add(hTable);
+ return hTable;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+
+ }
+
+ private HTable htable() {
+ return threadLocalHtable.get();
+ }
+
+ private byte[] effectiveKey(String key) {
+ try {
+ return key.getBytes(UTF_8);
+ } catch (UnsupportedEncodingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public Object get(String key) {
+ WindowsStore.Entry.nonNullCheckForKey(key);
+
+ byte[] effectiveKey = effectiveKey(key);
+ Get get = new Get(effectiveKey);
+ Result result = null;
+ try {
+ result = htable().get(get);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ if(result.isEmpty()) {
+ return null;
+ }
+
+ Kryo kryo = new Kryo();
+ Input input = new Input(result.getValue(family, qualifier));
+ Object resultObject = kryo.readClassAndObject(input);
+ return resultObject;
+
+ }
+
+ @Override
+ public Iterable<Object> get(List<String> keys) {
+ List<Get> gets = new ArrayList<>();
+ for (String key : keys) {
+ WindowsStore.Entry.nonNullCheckForKey(key);
+
+ byte[] effectiveKey = effectiveKey(key);
+ gets.add(new Get(effectiveKey));
+ }
+
+ Result[] results = null;
+ try {
+ results = htable().get(gets);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ Kryo kryo = new Kryo();
+ List<Object> values = new ArrayList<>();
+ for (int i=0; i<results.length; i++) {
+ Result result = results[i];
+ if(result.isEmpty()) {
+ log.error("Got empty result for key [{}]", keys.get(i));
+ throw new RuntimeException("Received empty result for key: "+keys.get(i));
+ }
+ Input input = new Input(result.getValue(family, qualifier));
+ Object resultObject = kryo.readClassAndObject(input);
+ values.add(resultObject);
+ }
+
+ return values;
+ }
+
+ @Override
+ public Iterable<String> getAllKeys() {
+ Scan scan = new Scan();
+ // this filter makes sure to receive only Key or row but not values associated with those rows.
+ scan.setFilter(new FirstKeyOnlyFilter());
+ //scan.setCaching(1000);
+
+ final Iterator<Result> resultIterator;
+ try {
+ resultIterator = htable().getScanner(scan).iterator();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ final Iterator<String> iterator = new Iterator<String>() {
+ @Override
+ public boolean hasNext() {
+ return resultIterator.hasNext();
+ }
+
+ @Override
+ public String next() {
+ Result result = resultIterator.next();
+ String key = null;
+ try {
+ key = new String(result.getRow(), UTF_8);
+ } catch (UnsupportedEncodingException e) {
+ throw new RuntimeException(e);
+ }
+ return key;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("remove operation is not supported");
+ }
+ };
+
+ return new Iterable<String>() {
+ @Override
+ public Iterator<String> iterator() {
+ return iterator;
+ }
+ };
+ }
+
+ @Override
+ public void put(String key, Object value) {
+ WindowsStore.Entry.nonNullCheckForKey(key);
+ WindowsStore.Entry.nonNullCheckForValue(value);
+
+ if(value == null) {
+ throw new IllegalArgumentException("Invalid value of null with key: "+key);
+ }
+ Put put = new Put(effectiveKey(key));
+ Kryo kryo = new Kryo();
+ Output output = new Output(new ByteArrayOutputStream());
+ kryo.writeClassAndObject(output, value);
+ put.add(family, ByteBuffer.wrap(qualifier), System.currentTimeMillis(), ByteBuffer.wrap(output.getBuffer(), 0, output.position()));
+ try {
+ htable().put(put);
+ } catch (InterruptedIOException | RetriesExhaustedWithDetailsException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void putAll(Collection<Entry> entries) {
+ List<Put> list = new ArrayList<>();
+ for (Entry entry : entries) {
+ Put put = new Put(effectiveKey(entry.key));
+ Output output = new Output(new ByteArrayOutputStream());
+ Kryo kryo = new Kryo();
+ kryo.writeClassAndObject(output, entry.value);
+ put.add(family, ByteBuffer.wrap(qualifier), System.currentTimeMillis(), ByteBuffer.wrap(output.getBuffer(), 0, output.position()));
+ list.add(put);
+ }
+
+ try {
+ htable().put(list);
+ } catch (InterruptedIOException | RetriesExhaustedWithDetailsException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void remove(String key) {
+ WindowsStore.Entry.nonNullCheckForKey(key);
+
+ Delete delete = new Delete(effectiveKey(key), System.currentTimeMillis());
+ try {
+ htable().delete(delete);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void removeAll(Collection<String> keys) {
+ List<Delete> deleteBatch = new ArrayList<>();
+ for (String key : keys) {
+ WindowsStore.Entry.nonNullCheckForKey(key);
+
+ Delete delete = new Delete(effectiveKey(key), System.currentTimeMillis());
+ deleteBatch.add(delete);
+ }
+ try {
+ htable().delete(deleteBatch);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void shutdown() {
+ // close all the created hTable instances
+ for (HTable htable : htables) {
+ try {
+ htable.close();
+ } catch (IOException e) {
+ log.error(e.getMessage(), e);
+ }
+ }
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java
new file mode 100644
index 0000000..56fad58
--- /dev/null
+++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.hbase.trident.windowing;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.storm.trident.windowing.WindowsStore;
+import org.apache.storm.trident.windowing.WindowsStoreFactory;
+
+import java.util.Map;
+
+public class HBaseWindowsStoreFactory implements WindowsStoreFactory {
+ private final Map<String, Object> config;
+ private final String tableName;
+ private final byte[] family;
+ private final byte[] qualifier;
+
+ public HBaseWindowsStoreFactory(Map<String, Object> config, String tableName, byte[] family, byte[] qualifier) {
+ this.config = config;
+ this.tableName = tableName;
+ this.family = family;
+ this.qualifier = qualifier;
+ }
+
+ public WindowsStore create() {
+ Configuration configuration = HBaseConfiguration.create();
+ for (Map.Entry<String, Object> entry : config.entrySet()) {
+ if (entry.getValue() != null) {
+ configuration.set(entry.getKey(), entry.getValue().toString());
+ }
+ }
+ return new HBaseWindowsStore(configuration, tableName, family, qualifier);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/Config.java b/storm-core/src/jvm/org/apache/storm/Config.java
index 37565b8..507614b 100644
--- a/storm-core/src/jvm/org/apache/storm/Config.java
+++ b/storm-core/src/jvm/org/apache/storm/Config.java
@@ -1974,6 +1974,14 @@ public class Config extends HashMap<String, Object> {
public static final String TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS="topology.trident.batch.emit.interval.millis";
/**
+ * Maximum number of tuples that can be stored inmemory cache in windowing operators for fast access without fetching
+ * them from store.
+ */
+ @isInteger
+ @isPositiveNumber
+ public static final String TOPOLOGY_TRIDENT_WINDOWING_INMEMORY_CACHE_LIMIT="topology.trident.windowing.cache.tuple.limit";
+
+ /**
* Name of the topology. This config is automatically set by Storm when the topology is submitted.
*/
@isString
http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/Stream.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/Stream.java b/storm-core/src/jvm/org/apache/storm/trident/Stream.java
index 4a51b56..b3a4446 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/Stream.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/Stream.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ * <p/>
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -21,6 +21,7 @@ import org.apache.storm.generated.Grouping;
import org.apache.storm.generated.NullStruct;
import org.apache.storm.grouping.CustomStreamGrouping;
import org.apache.storm.topology.ResourceDeclarer;
+import org.apache.storm.topology.base.BaseWindowedBolt;
import org.apache.storm.trident.fluent.ChainedAggregatorDeclarer;
import org.apache.storm.trident.fluent.GlobalAggregationScheme;
import org.apache.storm.trident.fluent.GroupedStream;
@@ -68,10 +69,22 @@ import org.apache.storm.trident.state.StateSpec;
import org.apache.storm.trident.state.StateUpdater;
import org.apache.storm.trident.tuple.TridentTuple;
import org.apache.storm.trident.util.TridentUtils;
+import org.apache.storm.trident.windowing.InMemoryWindowsStoreFactory;
+import org.apache.storm.trident.windowing.WindowTridentProcessor;
+import org.apache.storm.trident.windowing.WindowsStateFactory;
+import org.apache.storm.trident.windowing.WindowsStateUpdater;
+import org.apache.storm.trident.windowing.WindowsStoreFactory;
+import org.apache.storm.trident.windowing.config.SlidingCountWindow;
+import org.apache.storm.trident.windowing.config.SlidingDurationWindow;
+import org.apache.storm.trident.windowing.config.TumblingCountWindow;
+import org.apache.storm.trident.windowing.config.TumblingDurationWindow;
+import org.apache.storm.trident.windowing.config.WindowConfig;
import org.apache.storm.tuple.Fields;
import org.apache.storm.utils.Utils;
+import java.util.ArrayList;
import java.util.Comparator;
+import java.util.List;
/**
* A Stream represents the core data model in Trident, and can be thought of as a "stream" of tuples that are processed
@@ -92,10 +105,10 @@ import java.util.Comparator;
*/
// TODO: need to be able to replace existing fields with the function fields (like Cascading Fields.REPLACE)
public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
- Node _node;
- TridentTopology _topology;
- String _name;
-
+ final Node _node;
+ final String _name;
+ private final TridentTopology _topology;
+
protected Stream(TridentTopology topology, String name, Node node) {
_topology = topology;
_node = node;
@@ -180,7 +193,7 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
*/
public GroupedStream groupBy(Fields fields) {
projectionValidation(fields);
- return new GroupedStream(this, fields);
+ return new GroupedStream(this, fields);
}
/**
@@ -287,7 +300,7 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
if(_node instanceof PartitionNode) {
return each(new Fields(), new TrueFilter()).partition(grouping);
} else {
- return _topology.addSourcedNode(this, new PartitionNode(_node.streamId, _name, getOutputFields(), grouping));
+ return _topology.addSourcedNode(this, new PartitionNode(_node.streamId, _name, getOutputFields(), grouping));
}
}
@@ -323,7 +336,7 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
functionFields,
new AggregateProcessor(inputFields, agg)));
}
-
+
public Stream stateQuery(TridentState state, Fields inputFields, QueryFunction function, Fields functionFields) {
projectionValidation(inputFields);
String stateId = state._node.stateInfo.id;
@@ -335,11 +348,11 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
_topology._colocate.get(stateId).add(n);
return _topology.addSourcedNode(this, n);
}
-
+
public TridentState partitionPersist(StateFactory stateFactory, Fields inputFields, StateUpdater updater, Fields functionFields) {
return partitionPersist(new StateSpec(stateFactory), inputFields, updater, functionFields);
}
-
+
public TridentState partitionPersist(StateSpec stateSpec, Fields inputFields, StateUpdater updater, Fields functionFields) {
projectionValidation(inputFields);
String id = _topology.getUniqueStateId();
@@ -352,19 +365,19 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
n.stateInfo = new NodeStateInfo(id, stateSpec);
return _topology.addSourcedStateNode(this, n);
}
-
+
public TridentState partitionPersist(StateFactory stateFactory, Fields inputFields, StateUpdater updater) {
return partitionPersist(stateFactory, inputFields, updater, new Fields());
}
-
+
public TridentState partitionPersist(StateSpec stateSpec, Fields inputFields, StateUpdater updater) {
- return partitionPersist(stateSpec, inputFields, updater, new Fields());
+ return partitionPersist(stateSpec, inputFields, updater, new Fields());
}
-
+
public Stream each(Function function, Fields functionFields) {
return each(null, function, functionFields);
}
-
+
public Stream each(Fields inputFields, Filter filter) {
return each(inputFields, new FilterExecutor(filter), new Fields());
}
@@ -448,7 +461,7 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
public ChainedAggregatorDeclarer chainedAgg() {
return new ChainedAggregatorDeclarer(this, new BatchGlobalAggScheme());
}
-
+
public Stream partitionAggregate(Aggregator agg, Fields functionFields) {
return partitionAggregate(null, agg, functionFields);
}
@@ -462,8 +475,8 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
return chainedAgg()
.partitionAggregate(inputFields, agg, functionFields)
.chainEnd();
- }
-
+ }
+
public Stream partitionAggregate(ReducerAggregator agg, Fields functionFields) {
return partitionAggregate(null, agg, functionFields);
}
@@ -565,7 +578,7 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
public Stream aggregate(Aggregator agg, Fields functionFields) {
return aggregate(null, agg, functionFields);
}
-
+
public Stream aggregate(Fields inputFields, Aggregator agg, Fields functionFields) {
projectionValidation(inputFields);
return chainedAgg()
@@ -594,19 +607,169 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
.aggregate(inputFields, agg, functionFields)
.chainEnd();
}
-
+
+ /**
+ * Returns a stream of tuples which are aggregated results of a tumbling window with every {@code windowCount} of tuples.
+ *
+ * @param windowCount represents window tuples count
+ * @param inputFields projected fields for aggregator
+ * @param aggregator aggregator to run on the window of tuples to compute the result and emit to the stream.
+ * @param functionFields fields of values to emit with aggregation.
+ *
+ * @return
+ */
+ public Stream tumblingCountWindow(int windowCount, Fields inputFields, Aggregator aggregator, Fields functionFields) {
+ return window(TumblingCountWindow.of(windowCount), inputFields, aggregator, functionFields);
+ }
+
+ /**
+ * Returns a stream of tuples which are aggregated results of a tumbling window with every {@code windowCount} of tuples.
+ *
+ * @param windowCount represents no of tuples in the window
+ * @param windowStoreFactory intermediary tuple store for storing windowing tuples
+ * @param inputFields projected fields for aggregator
+ * @param aggregator aggregator to run on the window of tuples to compute the result and emit to the stream.
+ * @param functionFields fields of values to emit with aggregation.
+ *
+ * @return
+ */
+ public Stream tumblingCountWindow(int windowCount, WindowsStoreFactory windowStoreFactory,
+ Fields inputFields, Aggregator aggregator, Fields functionFields) {
+ return window(TumblingCountWindow.of(windowCount), windowStoreFactory, inputFields, aggregator, functionFields);
+ }
+
+ /**
+ * Returns a stream of tuples which are aggregated results of a sliding window with every {@code windowCount} of tuples
+ * and slides the window with {@code slideCount}.
+ *
+ * @param windowCount represents tuples count of a window
+ * @param slideCount represents sliding count window
+ * @param windowStoreFactory intermediary tuple store for storing windowing tuples
+ * @param inputFields projected fields for aggregator
+ * @param aggregator aggregator to run on the window of tuples to compute the result and emit to the stream.
+ * @param functionFields fields of values to emit with aggregation.
+ *
+ * @return
+ */
+ public Stream slidingCountWindow(int windowCount, int slideCount, WindowsStoreFactory windowStoreFactory,
+ Fields inputFields, Aggregator aggregator, Fields functionFields) {
+ return window(SlidingCountWindow.of(windowCount, slideCount), windowStoreFactory, inputFields, aggregator, functionFields);
+ }
+
+ /**
+ * Returns a stream of tuples which are aggregated results of a window tumbles at duration of {@code windowDuration}
+ *
+ * @param windowDuration represents tumbling window duration configuration
+ * @param windowStoreFactory intermediary tuple store for storing windowing tuples
+ * @param inputFields projected fields for aggregator
+ * @param aggregator aggregator to run on the window of tuples to compute the result and emit to the stream.
+ * @param functionFields fields of values to emit with aggregation.
+ *
+ * @return
+ */
+ public Stream tumblingTimeWindow(BaseWindowedBolt.Duration windowDuration, WindowsStoreFactory windowStoreFactory,
+ Fields inputFields, Aggregator aggregator, Fields functionFields) {
+ return window(TumblingDurationWindow.of(windowDuration), windowStoreFactory, inputFields, aggregator, functionFields);
+ }
+
+ /**
+ * Returns a stream of tuples which are aggregated results of a window which slides at duration of {@code slideDuration}
+ * and completes a window at {@code windowDuration}
+ *
+ * @param windowDuration represents window duration configuration
+ * @param slideDuration represents sliding duration configuration
+ * @param windowStoreFactory intermediary tuple store for storing windowing tuples
+ * @param inputFields projected fields for aggregator
+ * @param aggregator aggregator to run on the window of tuples to compute the result and emit to the stream.
+ * @param functionFields fields of values to emit with aggregation.
+ *
+ * @return
+ */
+ public Stream slidingTimeWindow(BaseWindowedBolt.Duration windowDuration, BaseWindowedBolt.Duration slideDuration,
+ WindowsStoreFactory windowStoreFactory, Fields inputFields, Aggregator aggregator, Fields functionFields) {
+ return window(SlidingDurationWindow.of(windowDuration, slideDuration), windowStoreFactory, inputFields, aggregator, functionFields);
+ }
+
+ /**
+ * Returns a stream of aggregated results based on the given window configuration which uses inmemory windowing tuple store.
+ *
+ * @param windowConfig window configuration like window length and slide length.
+ * @param inputFields input fields
+ * @param aggregator aggregator to run on the window of tuples to compute the result and emit to the stream.
+ * @param functionFields fields of values to emit with aggregation.
+ * @return
+ */
+ public Stream window(WindowConfig windowConfig, Fields inputFields, Aggregator aggregator, Fields functionFields) {
+ // this store is used only for storing triggered aggregated results but not tuples as storeTuplesInStore is set
+ // as false int he below call.
+ InMemoryWindowsStoreFactory inMemoryWindowsStoreFactory = new InMemoryWindowsStoreFactory();
+ return window(windowConfig, inMemoryWindowsStoreFactory, inputFields, aggregator, functionFields, false);
+ }
+
+ /**
+ * Returns stream of aggregated results based on the given window configuration.
+ *
+ * @param windowConfig window configuration like window length and slide length.
+ * @param windowStoreFactory intermediary tuple store for storing tuples for windowing
+ * @param inputFields input fields
+ * @param aggregator aggregator to run on the window of tuples to compute the result and emit to the stream.
+ * @param functionFields fields of values to emit with aggregation.
+ * @return
+ */
+ public Stream window(WindowConfig windowConfig, WindowsStoreFactory windowStoreFactory, Fields inputFields,
+ Aggregator aggregator, Fields functionFields) {
+ return window(windowConfig, windowStoreFactory, inputFields, aggregator, functionFields, true);
+ }
+
+ private Stream window(WindowConfig windowConfig, WindowsStoreFactory windowStoreFactory, Fields inputFields, Aggregator aggregator,
+ Fields functionFields, boolean storeTuplesInStore) {
+ projectionValidation(inputFields);
+ windowConfig.validate();
+
+ Fields fields = addTriggerField(functionFields);
+
+ // when storeTuplesInStore is false then the given windowStoreFactory is only used to store triggers and
+ // that store is passed to WindowStateUpdater to remove them after committing the batch.
+ Stream stream = _topology.addSourcedNode(this,
+ new ProcessorNode(_topology.getUniqueStreamId(),
+ _name,
+ fields,
+ fields,
+ new WindowTridentProcessor(windowConfig, _topology.getUniqueWindowId(), windowStoreFactory,
+ inputFields, aggregator, storeTuplesInStore)));
+
+ Stream effectiveStream = stream.project(functionFields);
+
+ // create StateUpdater with the given windowStoreFactory to remove triggered aggregation results form store
+ // when they are successfully processed.
+ StateFactory stateFactory = new WindowsStateFactory();
+ StateUpdater stateUpdater = new WindowsStateUpdater(windowStoreFactory);
+ stream.partitionPersist(stateFactory, new Fields(WindowTridentProcessor.TRIGGER_FIELD_NAME), stateUpdater, new Fields());
+
+ return effectiveStream;
+ }
+
+ private Fields addTriggerField(Fields functionFields) {
+ List<String> fieldsList = new ArrayList<>();
+ fieldsList.add(WindowTridentProcessor.TRIGGER_FIELD_NAME);
+ for (String field : functionFields) {
+ fieldsList.add(field);
+ }
+ return new Fields(fieldsList);
+ }
+
public TridentState partitionPersist(StateFactory stateFactory, StateUpdater updater, Fields functionFields) {
return partitionPersist(new StateSpec(stateFactory), updater, functionFields);
}
-
+
public TridentState partitionPersist(StateSpec stateSpec, StateUpdater updater, Fields functionFields) {
return partitionPersist(stateSpec, null, updater, functionFields);
}
-
+
public TridentState partitionPersist(StateFactory stateFactory, StateUpdater updater) {
return partitionPersist(stateFactory, updater, new Fields());
}
-
+
public TridentState partitionPersist(StateSpec stateSpec, StateUpdater updater) {
return partitionPersist(stateSpec, updater, new Fields());
}
@@ -622,7 +785,7 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
public TridentState persistentAggregate(StateFactory stateFactory, Fields inputFields, CombinerAggregator agg, Fields functionFields) {
return persistentAggregate(new StateSpec(stateFactory), inputFields, agg, functionFields);
}
-
+
public TridentState persistentAggregate(StateSpec spec, Fields inputFields, CombinerAggregator agg, Fields functionFields) {
projectionValidation(inputFields);
// replaces normal aggregation here with a global grouping because it needs to be consistent across batches
@@ -648,7 +811,7 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
projectionValidation(inputFields);
return global().partitionPersist(spec, inputFields, new ReducerAggStateUpdater(agg), functionFields);
}
-
+
public Stream stateQuery(TridentState state, QueryFunction function, Fields functionFields) {
return stateQuery(state, null, function, functionFields);
}
@@ -662,7 +825,7 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
public Fields getOutputFields() {
return _node.allOutputFields;
}
-
+
static class BatchGlobalAggScheme implements GlobalAggregationScheme<Stream> {
@Override
@@ -674,9 +837,9 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
public BatchToPartition singleEmitPartitioner() {
return new IndexHashBatchToPartition();
}
-
+
}
-
+
static class GlobalAggScheme implements GlobalAggregationScheme<Stream> {
@Override
@@ -688,7 +851,7 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
public BatchToPartition singleEmitPartitioner() {
return new GlobalBatchToPartition();
}
-
+
}
private void projectionValidation(Fields projFields) {
http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java b/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java
index e0a349b..06e1576 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java
@@ -829,6 +829,10 @@ public class TridentTopology {
protected String getUniqueStateId() {
return _gen.getUniqueStateId();
}
+
+ protected String getUniqueWindowId() {
+ return _gen.getUniqueWindowId();
+ }
protected void registerNode(Node n) {
_graph.addVertex(n);
http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/fluent/UniqueIdGen.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/fluent/UniqueIdGen.java b/storm-core/src/jvm/org/apache/storm/trident/fluent/UniqueIdGen.java
index 1364faf..e063142 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/fluent/UniqueIdGen.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/fluent/UniqueIdGen.java
@@ -18,17 +18,21 @@
package org.apache.storm.trident.fluent;
public class UniqueIdGen {
- int _streamCounter = 0;
-
+ private int _streamCounter = 0;
+ private int _stateCounter = 0;
+ private int windowCounter = 0;
+
public String getUniqueStreamId() {
_streamCounter++;
return "s" + _streamCounter;
}
- int _stateCounter = 0;
-
public String getUniqueStateId() {
_stateCounter++;
return "state" + _stateCounter;
- }
+ }
+
+ public String getUniqueWindowId() {
+ return "w"+ (++windowCounter);
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Debug.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Debug.java b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Debug.java
index 07c5ae4..87c4167 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Debug.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Debug.java
@@ -20,6 +20,8 @@ package org.apache.storm.trident.operation.builtin;
import org.apache.storm.trident.operation.BaseFilter;
import org.apache.storm.trident.tuple.TridentTuple;
+import java.util.Date;
+
/**
* Filter for debugging purposes. The `isKeep()` method simply prints the tuple to `System.out` and returns `true`.
*/
@@ -40,7 +42,7 @@ public class Debug extends BaseFilter {
@Override
public boolean isKeep(TridentTuple tuple) {
- System.out.println(name + tuple.toString());
+ System.out.println("<"+new Date()+"> "+name + tuple.toString());
return true;
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java
new file mode 100644
index 0000000..2941e28
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java
@@ -0,0 +1,241 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.windowing;
+
+import com.google.common.collect.Lists;
+import org.apache.storm.coordination.BatchOutputCollector;
+import org.apache.storm.trident.operation.Aggregator;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.apache.storm.trident.windowing.config.WindowConfig;
+import org.apache.storm.trident.windowing.strategy.WindowStrategy;
+import org.apache.storm.trident.windowing.strategy.WindowStrategyFactory;
+import org.apache.storm.windowing.EvictionPolicy;
+import org.apache.storm.windowing.TriggerPolicy;
+import org.apache.storm.windowing.WindowLifecycleListener;
+import org.apache.storm.windowing.WindowManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Basic functionality to manage trident tuple events using {@code WindowManager} and {@code WindowsStore} for storing
+ * tuples and triggers related information.
+ *
+ */
+public abstract class AbstractTridentWindowManager<T> implements ITridentWindowManager {
+ private static final Logger log = LoggerFactory.getLogger(AbstractTridentWindowManager.class);
+
+ protected final WindowManager<T> windowManager;
+ protected final Aggregator aggregator;
+ protected final BatchOutputCollector delegateCollector;
+ protected final String windowTaskId;
+ protected final WindowsStore windowStore;
+
+ protected final Set<String> activeBatches = new HashSet<>();
+ protected final Queue<TriggerResult> pendingTriggers = new ConcurrentLinkedQueue<>();
+ protected final AtomicInteger triggerId = new AtomicInteger();
+ private final String windowTriggerCountId;
+ private final TriggerPolicy<T> triggerPolicy;
+
+ public AbstractTridentWindowManager(WindowConfig windowConfig, String windowTaskId, WindowsStore windowStore,
+ Aggregator aggregator, BatchOutputCollector delegateCollector) {
+ this.windowTaskId = windowTaskId;
+ this.windowStore = windowStore;
+ this.aggregator = aggregator;
+ this.delegateCollector = delegateCollector;
+
+ windowTriggerCountId = WindowTridentProcessor.TRIGGER_COUNT_PREFIX + windowTaskId;
+
+ windowManager = new WindowManager<>(new TridentWindowLifeCycleListener());
+
+ WindowStrategy<T> windowStrategy = WindowStrategyFactory.create(windowConfig);
+ EvictionPolicy<T> evictionPolicy = windowStrategy.getEvictionPolicy();
+ windowManager.setEvictionPolicy(evictionPolicy);
+ triggerPolicy = windowStrategy.getTriggerPolicy(windowManager, evictionPolicy);
+ windowManager.setTriggerPolicy(triggerPolicy);
+ }
+
+ @Override
+ public void prepare() {
+ preInitialize();
+
+ initialize();
+
+ postInitialize();
+ }
+
+ private void preInitialize() {
+ log.debug("Getting current trigger count for this component/task");
+ // get trigger count value from store
+ Object result = windowStore.get(windowTriggerCountId);
+ Integer currentCount = 0;
+ if(result == null) {
+ log.info("No current trigger count in windows store.");
+ } else {
+ currentCount = (Integer) result + 1;
+ }
+ windowStore.put(windowTriggerCountId, currentCount);
+ triggerId.set(currentCount);
+ }
+
+ private void postInitialize() {
+ // start trigger once the initialization is done.
+ triggerPolicy.start();
+ }
+
+ /**
+ * Load and initialize any resources into window manager before windowing for component/task is activated.
+ */
+ protected abstract void initialize();
+
+ /**
+ * Listener to reeive any activation/expiry of windowing events and take further action on them.
+ */
+ class TridentWindowLifeCycleListener implements WindowLifecycleListener<T> {
+
+ @Override
+ public void onExpiry(List<T> expiredEvents) {
+ log.debug("onExpiry is invoked");
+ onTuplesExpired(expiredEvents);
+ }
+
+ @Override
+ public void onActivation(List<T> events, List<T> newEvents, List<T> expired) {
+ log.debug("onActivation is invoked with events size: {}", events.size());
+ // trigger occurred, create an aggregation and keep them in store
+ int currentTriggerId = triggerId.incrementAndGet();
+ execAggregatorAndStoreResult(currentTriggerId, events);
+ }
+ }
+
+ /**
+ * Handle expired tuple events which can be removing from cache or store.
+ *
+ * @param expiredEvents
+ */
+ protected abstract void onTuplesExpired(List<T> expiredEvents);
+
+ private void execAggregatorAndStoreResult(int currentTriggerId, List<T> tupleEvents) {
+ List<TridentTuple> resultTuples = getTridentTuples(tupleEvents);
+
+ // run aggregator to compute the result
+ AccumulatedTuplesCollector collector = new AccumulatedTuplesCollector(delegateCollector);
+ Object state = aggregator.init(currentTriggerId, collector);
+ for (TridentTuple resultTuple : resultTuples) {
+ aggregator.aggregate(state, resultTuple, collector);
+ }
+ aggregator.complete(state, collector);
+
+ List<List<Object>> resultantAggregatedValue = collector.values;
+
+ ArrayList<WindowsStore.Entry> entries = Lists.newArrayList(new WindowsStore.Entry(windowTriggerCountId, currentTriggerId + 1),
+ new WindowsStore.Entry(WindowTridentProcessor.generateWindowTriggerKey(windowTaskId, currentTriggerId), resultantAggregatedValue));
+ windowStore.putAll(entries);
+
+ pendingTriggers.add(new TriggerResult(currentTriggerId, resultantAggregatedValue));
+ }
+
+ /**
+ * Return {@code TridentTuple}s from given {@code tupleEvents}.
+ * @param tupleEvents
+ * @return
+ */
+ protected abstract List<TridentTuple> getTridentTuples(List<T> tupleEvents);
+
+ /**
+ * This {@code TridentCollector} accumulates all the values emitted.
+ */
+ static class AccumulatedTuplesCollector implements TridentCollector {
+
+ final List<List<Object>> values = new ArrayList<>();
+ private final BatchOutputCollector delegateCollector;
+
+ public AccumulatedTuplesCollector(BatchOutputCollector delegateCollector) {
+ this.delegateCollector = delegateCollector;
+ }
+
+ @Override
+ public void emit(List<Object> values) {
+ this.values.add(values);
+ }
+
+ @Override
+ public void reportError(Throwable t) {
+ delegateCollector.reportError(t);
+ }
+
+ }
+
+ static class TriggerResult {
+ final int id;
+ final List<List<Object>> result;
+
+ public TriggerResult(int id, List<List<Object>> result) {
+ this.id = id;
+ this.result = result;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (!(o instanceof TriggerResult)) return false;
+
+ TriggerResult that = (TriggerResult) o;
+
+ return id == that.id;
+
+ }
+
+ @Override
+ public int hashCode() {
+ return id;
+ }
+
+ @Override
+ public String toString() {
+ return "TriggerResult{" +
+ "id=" + id +
+ ", result=" + result +
+ '}';
+ }
+ }
+
+ public Queue<TriggerResult> getPendingTriggers() {
+ return pendingTriggers;
+ }
+
+ public void shutdown() {
+ try {
+ log.info("window manager [{}] is being shutdown", windowManager);
+ windowManager.shutdown();
+ } finally {
+ log.info("window store [{}] is being shutdown", windowStore);
+ windowStore.shutdown();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/windowing/ITridentWindowManager.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/ITridentWindowManager.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/ITridentWindowManager.java
new file mode 100644
index 0000000..d4aef79
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/ITridentWindowManager.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.windowing;
+
+import org.apache.storm.trident.tuple.TridentTuple;
+
+import java.util.List;
+import java.util.Queue;
+
+/**
+ * Window manager to handle trident tuple events.
+ */
+public interface ITridentWindowManager {
+
+ /**
+ * This is invoked from {@code org.apache.storm.trident.planner.TridentProcessor}'s prepare method. So any
+ * initialization tasks can be done before the topology starts accepting tuples. For ex:
+ * initialize window manager with any earlier stored tuples/triggers and start WindowManager
+ */
+ public void prepare();
+
+ /**
+ * This is invoked when from {@code org.apache.storm.trident.planner.TridentProcessor}'s cleanup method. So, any
+ * cleanup operations like clearing cache or close store connection etc can be done.
+ */
+ public void shutdown();
+
+ /**
+ * Add received batch of tuples to cache/store and add them to {@code WindowManager}
+ *
+ * @param batchId
+ * @param tuples
+ */
+ public void addTuplesBatch(Object batchId, List<TridentTuple> tuples);
+
+ /**
+ * Returns pending triggers to be emitted.
+ *
+ * @return
+ */
+ public Queue<StoreBasedTridentWindowManager.TriggerResult> getPendingTriggers();
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryTridentWindowManager.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryTridentWindowManager.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryTridentWindowManager.java
new file mode 100644
index 0000000..cbb30af
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryTridentWindowManager.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.storm.trident.windowing;
+
+import org.apache.storm.coordination.BatchOutputCollector;
+import org.apache.storm.trident.operation.Aggregator;
+import org.apache.storm.trident.spout.IBatchID;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.apache.storm.trident.windowing.config.WindowConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * This {@code ITridentWindowManager} instance stores all the tuples and trigger related information inmemory.
+ */
+public class InMemoryTridentWindowManager extends AbstractTridentWindowManager<TridentTuple> {
+ private static final Logger log = LoggerFactory.getLogger(InMemoryTridentWindowManager.class);
+
+ public InMemoryTridentWindowManager(WindowConfig windowConfig, String windowTaskId, WindowsStore windowStore, Aggregator aggregator,
+ BatchOutputCollector delegateCollector) {
+ super(windowConfig, windowTaskId, windowStore, aggregator, delegateCollector);
+ }
+
+ @Override
+ protected void initialize() {
+ log.debug("noop in initialize");
+ }
+
+ @Override
+ public List<TridentTuple> getTridentTuples(List<TridentTuple> tridentBatchTuples) {
+ return tridentBatchTuples;
+ }
+
+ @Override
+ public void onTuplesExpired(List<TridentTuple> expiredTuples) {
+ log.debug("InMemoryTridentWindowManager.onTuplesExpired");
+ }
+
+ public void addTuplesBatch(Object batchId, List<TridentTuple> tuples) {
+ // check if they are already added then ignore these tuples. This batch is replayed.
+ if (activeBatches.contains(getBatchTxnId(batchId))) {
+ log.info("Ignoring already added tuples with batch: %s", batchId);
+ return;
+ }
+
+ log.debug("Adding tuples to window-manager for batch: ", batchId);
+ for (TridentTuple tridentTuple : tuples) {
+ windowManager.add(tridentTuple);
+ }
+ }
+
+ public String getBatchTxnId(Object batchId) {
+ if (!(batchId instanceof IBatchID)) {
+ throw new IllegalArgumentException("argument should be an IBatchId instance");
+ }
+ return ((IBatchID) batchId).getId().toString();
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryWindowsStore.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryWindowsStore.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryWindowsStore.java
new file mode 100644
index 0000000..02d78e7
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryWindowsStore.java
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.windowing;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Enumeration;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Inmemory store implementation of {@code WindowsStore} which can be backed by persistent store.
+ *
+ */
+public class InMemoryWindowsStore implements WindowsStore, Serializable {
+
+ private final ConcurrentHashMap<String, Object> store = new ConcurrentHashMap<>();
+
+ private int maxSize;
+ private AtomicInteger currentSize;
+ private WindowsStore backingStore;
+
+ public InMemoryWindowsStore() {
+ }
+
+ /**
+ *
+ * @param maxSize maximum size of inmemory store
+ * @param backingStore backing store containing the entries
+ */
+ public InMemoryWindowsStore(int maxSize, WindowsStore backingStore) {
+ this.maxSize = maxSize;
+ currentSize = new AtomicInteger();
+ this.backingStore = backingStore;
+ }
+
+ @Override
+ public Object get(String key) {
+ Object value = store.get(key);
+
+ if(value == null && backingStore != null) {
+ value = backingStore.get(key);
+ }
+
+ return value;
+ }
+
+ @Override
+ public Iterable<Object> get(List<String> keys) {
+ List<Object> values = new ArrayList<>();
+ for (String key : keys) {
+ values.add(get(key));
+ }
+ return values;
+ }
+
+ @Override
+ public Iterable<String> getAllKeys() {
+ if(backingStore != null) {
+ return backingStore.getAllKeys();
+ }
+
+ final Enumeration<String> storeEnumeration = store.keys();
+ final Iterator<String> resultIterator = new Iterator<String>() {
+ @Override
+ public boolean hasNext() {
+ return storeEnumeration.hasMoreElements();
+ }
+
+ @Override
+ public String next() {
+ return storeEnumeration.nextElement();
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("remove operation is not supported as it is immutable.");
+ }
+ };
+
+ return new Iterable<String>() {
+ @Override
+ public Iterator<String> iterator() {
+ return resultIterator;
+ }
+ };
+ }
+
+ @Override
+ public void put(String key, Object value) {
+ _put(key, value);
+
+ if(backingStore != null) {
+ backingStore.put(key, value);
+ }
+ }
+
+ private void _put(String key, Object value) {
+ if(!canAdd()) {
+ return;
+ }
+
+ store.put(key, value);
+ incrementCurrentSize();
+ }
+
+ private void incrementCurrentSize() {
+ if(backingStore != null) {
+ currentSize.incrementAndGet();
+ }
+ }
+
+ private boolean canAdd() {
+ return backingStore == null || currentSize.get() < maxSize;
+ }
+
+ @Override
+ public void putAll(Collection<Entry> entries) {
+ for (Entry entry : entries) {
+ _put(entry.key, entry.value);
+ }
+ if(backingStore != null) {
+ backingStore.putAll(entries);
+ }
+ }
+
+ @Override
+ public void remove(String key) {
+ _remove(key);
+
+ if(backingStore != null) {
+ backingStore.remove(key);
+ }
+ }
+
+ private void _remove(String key) {
+ Object oldValue = store.remove(key);
+
+ if(oldValue != null) {
+ decrementSize();
+ if(backingStore != null) {
+ backingStore.remove(key);
+ }
+ }
+
+ }
+
+ private void decrementSize() {
+ if(backingStore != null) {
+ currentSize.decrementAndGet();
+ }
+ }
+
+ @Override
+ public void removeAll(Collection<String> keys) {
+ for (String key : keys) {
+ _remove(key);
+ }
+
+ if(backingStore != null) {
+ backingStore.removeAll(keys);
+ }
+ }
+
+ @Override
+ public void shutdown() {
+ store.clear();
+
+ if(backingStore != null) {
+ backingStore.shutdown();
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "InMemoryWindowsStore{" +
+ " store:size = " + store.size() +
+ " backingStore = " + backingStore +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryWindowsStoreFactory.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryWindowsStoreFactory.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryWindowsStoreFactory.java
new file mode 100644
index 0000000..32027a9
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryWindowsStoreFactory.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.windowing;
+
+/**
+ * InMemoryWindowsStoreFactory contains a single instance of {@code InMemoryWindowsStore} which will be used for
+ * storing tuples and triggers of the window and successfully emitted triggers can be removed from {@code StateUpdater}.
+ *
+ */
+public class InMemoryWindowsStoreFactory implements WindowsStoreFactory {
+
+ private InMemoryWindowsStore inMemoryWindowsStore;
+
+ @Override
+ public WindowsStore create() {
+ if(inMemoryWindowsStore == null) {
+ inMemoryWindowsStore = new InMemoryWindowsStore();
+ }
+ return inMemoryWindowsStore;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/windowing/StoreBasedTridentWindowManager.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/StoreBasedTridentWindowManager.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/StoreBasedTridentWindowManager.java
new file mode 100644
index 0000000..885e508
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/StoreBasedTridentWindowManager.java
@@ -0,0 +1,223 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.windowing;
+
+import org.apache.storm.coordination.BatchOutputCollector;
+import org.apache.storm.trident.operation.Aggregator;
+import org.apache.storm.trident.spout.IBatchID;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.apache.storm.trident.tuple.TridentTupleView;
+import org.apache.storm.trident.windowing.config.WindowConfig;
+import org.apache.storm.tuple.Fields;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * This window manager uses {@code WindowsStore} for storing tuples and other trigger related information. It maintains
+ * tuples cache of {@code maxCachedTuplesSize} without accessing store for getting them.
+ *
+ */
+public class StoreBasedTridentWindowManager extends AbstractTridentWindowManager<TridentBatchTuple> {
+ private static final Logger log = LoggerFactory.getLogger(StoreBasedTridentWindowManager.class);
+
+ private static final String TUPLE_PREFIX = "tu" + WindowsStore.KEY_SEPARATOR;
+
+ private final String windowTupleTaskId;
+ private final TridentTupleView.FreshOutputFactory freshOutputFactory;
+
+ private Long maxCachedTuplesSize;
+ private final Fields inputFields;
+ private AtomicLong currentCachedTuplesSize = new AtomicLong();
+
+ public StoreBasedTridentWindowManager(WindowConfig windowConfig, String windowTaskId, WindowsStore windowStore, Aggregator aggregator,
+ BatchOutputCollector delegateCollector, Long maxTuplesCacheSize, Fields inputFields) {
+ super(windowConfig, windowTaskId, windowStore, aggregator, delegateCollector);
+
+ this.maxCachedTuplesSize = maxTuplesCacheSize;
+ this.inputFields = inputFields;
+ freshOutputFactory = new TridentTupleView.FreshOutputFactory(inputFields);
+ windowTupleTaskId = TUPLE_PREFIX + windowTaskId;
+ }
+
+ protected void initialize() {
+
+ // get existing tuples and pending/unsuccessful triggers for this operator-component/task and add them to WindowManager
+ String windowTriggerInprocessId = WindowTridentProcessor.getWindowTriggerInprocessIdPrefix(windowTaskId);
+ String windowTriggerTaskId = WindowTridentProcessor.getWindowTriggerTaskPrefix(windowTaskId);
+
+ List<String> attemptedTriggerKeys = new ArrayList<>();
+ List<String> triggerKeys = new ArrayList<>();
+
+ Iterable<String> allEntriesIterable = windowStore.getAllKeys();
+ for (String key : allEntriesIterable) {
+ if (key.startsWith(windowTupleTaskId)) {
+ int tupleIndexValue = lastPart(key);
+ String batchId = secondLastPart(key);
+ log.debug("Received tuple with batch [{}] and tuple index [{}]", batchId, tupleIndexValue);
+ windowManager.add(new TridentBatchTuple(batchId, System.currentTimeMillis(), tupleIndexValue));
+ } else if (key.startsWith(windowTriggerTaskId)) {
+ triggerKeys.add(key);
+ log.debug("Received trigger with key [{}]", key);
+ } else if(key.startsWith(windowTriggerInprocessId)) {
+ attemptedTriggerKeys.add(key);
+ log.debug("Received earlier unsuccessful trigger [{}] from windows store [{}]", key);
+ }
+ }
+
+ // these triggers will be retried as part of batch retries
+ Set<Integer> triggersToBeIgnored = new HashSet<>();
+ Iterable<Object> attemptedTriggers = windowStore.get(attemptedTriggerKeys);
+ for (Object attemptedTrigger : attemptedTriggers) {
+ triggersToBeIgnored.addAll((List<Integer>) attemptedTrigger);
+ }
+
+ // get trigger values only if they have more than zero
+ Iterable<Object> triggerObjects = windowStore.get(triggerKeys);
+ int i=0;
+ for (Object triggerObject : triggerObjects) {
+ int id = lastPart(triggerKeys.get(i++));
+ if(!triggersToBeIgnored.contains(id)) {
+ log.info("Adding pending trigger value [{}]", triggerObject);
+ pendingTriggers.add(new TriggerResult(id, (List<List<Object>>) triggerObject));
+ }
+ }
+
+ }
+
+ private int lastPart(String key) {
+ int lastSepIndex = key.lastIndexOf(WindowsStore.KEY_SEPARATOR);
+ if (lastSepIndex < 0) {
+ throw new IllegalArgumentException("primaryKey does not have key separator '" + WindowsStore.KEY_SEPARATOR + "'");
+ }
+ return Integer.parseInt(key.substring(lastSepIndex+1));
+ }
+
+ private String secondLastPart(String key) {
+ int lastSepIndex = key.lastIndexOf(WindowsStore.KEY_SEPARATOR);
+ if (lastSepIndex < 0) {
+ throw new IllegalArgumentException("key "+key+" does not have key separator '" + WindowsStore.KEY_SEPARATOR + "'");
+ }
+ String trimKey = key.substring(0, lastSepIndex);
+ int secondLastSepIndex = trimKey.lastIndexOf(WindowsStore.KEY_SEPARATOR);
+ if (lastSepIndex < 0) {
+ throw new IllegalArgumentException("key "+key+" does not have second key separator '" + WindowsStore.KEY_SEPARATOR + "'");
+ }
+
+ return key.substring(secondLastSepIndex+1, lastSepIndex);
+ }
+
+ public void addTuplesBatch(Object batchId, List<TridentTuple> tuples) {
+ // check if they are already added then ignore these tuples. This batch is replayed.
+ if (activeBatches.contains(getBatchTxnId(batchId))) {
+ log.info("Ignoring already added tuples with batch: %s", batchId);
+ return;
+ }
+
+ log.debug("Adding tuples to window-manager for batch: ", batchId);
+ List<WindowsStore.Entry> entries = new ArrayList<>();
+ for (int i = 0; i < tuples.size(); i++) {
+ String key = keyOf(batchId);
+ TridentTuple tridentTuple = tuples.get(i);
+ entries.add(new WindowsStore.Entry(key+i, tridentTuple.select(inputFields)));
+ }
+
+ // tuples should be available in store before they are added to window manager
+ windowStore.putAll(entries);
+
+ for (int i = 0; i < tuples.size(); i++) {
+ String key = keyOf(batchId);
+ TridentTuple tridentTuple = tuples.get(i);
+ addToWindowManager(i, key, tridentTuple);
+ }
+
+ }
+
+ private void addToWindowManager(int tupleIndex, String effectiveBatchId, TridentTuple tridentTuple) {
+ TridentTuple actualTuple = null;
+ if (maxCachedTuplesSize == null || currentCachedTuplesSize.get() < maxCachedTuplesSize) {
+ actualTuple = tridentTuple;
+ }
+ currentCachedTuplesSize.incrementAndGet();
+ windowManager.add(new TridentBatchTuple(effectiveBatchId, System.currentTimeMillis(), tupleIndex, actualTuple));
+ }
+
+ public String getBatchTxnId(Object batchId) {
+ if (!(batchId instanceof IBatchID)) {
+ throw new IllegalArgumentException("argument should be an IBatchId instance");
+ }
+ return ((IBatchID) batchId).getId().toString();
+ }
+
+ public String keyOf(Object batchId) {
+ return windowTupleTaskId + getBatchTxnId(batchId) + WindowsStore.KEY_SEPARATOR;
+ }
+
+ public List<TridentTuple> getTridentTuples(List<TridentBatchTuple> tridentBatchTuples) {
+ List<TridentTuple> resultTuples = new ArrayList<>();
+ List<String> keys = new ArrayList<>();
+ for (TridentBatchTuple tridentBatchTuple : tridentBatchTuples) {
+ TridentTuple tuple = collectTridentTupleOrKey(tridentBatchTuple, keys);
+ if(tuple != null) {
+ resultTuples.add(tuple);
+ }
+ }
+
+ if(keys.size() > 0) {
+ Iterable<Object> storedTupleValues = windowStore.get(keys);
+ for (Object storedTupleValue : storedTupleValues) {
+ TridentTuple tridentTuple = freshOutputFactory.create((List<Object>) storedTupleValue);
+ resultTuples.add(tridentTuple);
+ }
+ }
+
+ return resultTuples;
+ }
+
+ public TridentTuple collectTridentTupleOrKey(TridentBatchTuple tridentBatchTuple, List<String> keys) {
+ if (tridentBatchTuple.tridentTuple != null) {
+ return tridentBatchTuple.tridentTuple;
+ }
+ keys.add(tupleKey(tridentBatchTuple));
+ return null;
+ }
+
+ public void onTuplesExpired(List<TridentBatchTuple> expiredTuples) {
+ if (maxCachedTuplesSize != null) {
+ currentCachedTuplesSize.addAndGet(-expiredTuples.size());
+ }
+
+ List<String> keys = new ArrayList<>();
+ for (TridentBatchTuple expiredTuple : expiredTuples) {
+ keys.add(tupleKey(expiredTuple));
+ }
+
+ windowStore.removeAll(keys);
+ }
+
+ private String tupleKey(TridentBatchTuple tridentBatchTuple) {
+ return tridentBatchTuple.effectiveBatchId + tridentBatchTuple.tupleIndex;
+ }
+
+}
[6/9] storm git commit: STORM-676 Addressed review comments
Posted by sr...@apache.org.
STORM-676 Addressed review comments
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b08d7eaf
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b08d7eaf
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b08d7eaf
Branch: refs/heads/1.x-branch
Commit: b08d7eaf7099e4da74010501a189818ec11b00bc
Parents: 3a96f20
Author: Satish Duggana <sd...@hortonworks.com>
Authored: Wed Mar 23 19:03:55 2016 +0530
Committer: Satish Duggana <sd...@hortonworks.com>
Committed: Sun Mar 27 10:47:03 2016 +0530
----------------------------------------------------------------------
.../trident/windowing/HBaseWindowsStore.java | 2 +-
.../windowing/HBaseWindowsStoreFactory.java | 1 +
.../jvm/org/apache/storm/trident/Stream.java | 22 +++++++++++---------
.../windowing/WindowTridentProcessor.java | 2 +-
4 files changed, 15 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/b08d7eaf/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java
----------------------------------------------------------------------
diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java
index ff3fbf9..b300ed6 100644
--- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java
+++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java
@@ -55,7 +55,7 @@ public class HBaseWindowsStore implements WindowsStore {
public static final String UTF_8 = "utf-8";
private final ThreadLocal<HTable> threadLocalHtable;
- private Queue<HTable> htables = new ConcurrentLinkedQueue<>();
+ private final Queue<HTable> htables = new ConcurrentLinkedQueue<>();
private final byte[] family;
private final byte[] qualifier;
http://git-wip-us.apache.org/repos/asf/storm/blob/b08d7eaf/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java
index a49bc87..a47d5fb 100644
--- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java
+++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java
@@ -26,6 +26,7 @@ import org.apache.storm.trident.windowing.WindowsStoreFactory;
import java.util.Map;
/**
+ * Factory to create {@link HBaseWindowsStore} instances.
*
*/
public class HBaseWindowsStoreFactory implements WindowsStoreFactory {
http://git-wip-us.apache.org/repos/asf/storm/blob/b08d7eaf/storm-core/src/jvm/org/apache/storm/trident/Stream.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/Stream.java b/storm-core/src/jvm/org/apache/storm/trident/Stream.java
index 444b42a..23ac34a 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/Stream.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/Stream.java
@@ -611,13 +611,13 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
/**
* Returns a stream of tuples which are aggregated results of a tumbling window with every {@code windowCount} of tuples.
*
- * @param windowCount represents no of tuples in the window
+ * @param windowCount represents number of tuples in the window
* @param windowStoreFactory intermediary tuple store for storing windowing tuples
* @param inputFields projected fields for aggregator
* @param aggregator aggregator to run on the window of tuples to compute the result and emit to the stream.
* @param functionFields fields of values to emit with aggregation.
*
- * @return
+ * @return the new stream with this operation.
*/
public Stream tumblingWindow(int windowCount, WindowsStoreFactory windowStoreFactory,
Fields inputFields, Aggregator aggregator, Fields functionFields) {
@@ -626,7 +626,7 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
/**
* Returns a stream of tuples which are aggregated results of a sliding window with every {@code windowCount} of tuples
- * and slides the window with {@code slideCount}.
+ * and slides the window after {@code slideCount}.
*
* @param windowCount represents tuples count of a window
* @param slideCount the number of tuples after which the window slides
@@ -635,7 +635,7 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
* @param aggregator aggregator to run on the window of tuples to compute the result and emit to the stream.
* @param functionFields fields of values to emit with aggregation.
*
- * @return
+ * @return the new stream with this operation.
*/
public Stream slidingWindow(int windowCount, int slideCount, WindowsStoreFactory windowStoreFactory,
Fields inputFields, Aggregator aggregator, Fields functionFields) {
@@ -643,7 +643,7 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
}
/**
- * Returns a stream of tuples which are aggregated results of a window tumbles at duration of {@code windowDuration}
+ * Returns a stream of tuples which are aggregated results of a window that tumbles at duration of {@code windowDuration}
*
* @param windowDuration represents tumbling window duration configuration
* @param windowStoreFactory intermediary tuple store for storing windowing tuples
@@ -651,7 +651,7 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
* @param aggregator aggregator to run on the window of tuples to compute the result and emit to the stream.
* @param functionFields fields of values to emit with aggregation.
*
- * @return
+ * @return the new stream with this operation.
*/
public Stream tumblingWindow(BaseWindowedBolt.Duration windowDuration, WindowsStoreFactory windowStoreFactory,
Fields inputFields, Aggregator aggregator, Fields functionFields) {
@@ -659,7 +659,7 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
}
/**
- * Returns a stream of tuples which are aggregated results of a window which slides at duration of {@code slideDuration}
+ * Returns a stream of tuples which are aggregated results of a window which slides at duration of {@code slidingInterval}
* and completes a window at {@code windowDuration}
*
* @param windowDuration represents window duration configuration
@@ -669,7 +669,7 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
* @param aggregator aggregator to run on the window of tuples to compute the result and emit to the stream.
* @param functionFields fields of values to emit with aggregation.
*
- * @return
+ * @return the new stream with this operation.
*/
public Stream slidingWindow(BaseWindowedBolt.Duration windowDuration, BaseWindowedBolt.Duration slidingInterval,
WindowsStoreFactory windowStoreFactory, Fields inputFields, Aggregator aggregator, Fields functionFields) {
@@ -683,7 +683,8 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
* @param inputFields input fields
* @param aggregator aggregator to run on the window of tuples to compute the result and emit to the stream.
* @param functionFields fields of values to emit with aggregation.
- * @return
+ *
+ * @return the new stream with this operation.
*/
public Stream window(WindowConfig windowConfig, Fields inputFields, Aggregator aggregator, Fields functionFields) {
// this store is used only for storing triggered aggregated results but not tuples as storeTuplesInStore is set
@@ -700,7 +701,8 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
* @param inputFields input fields
* @param aggregator aggregator to run on the window of tuples to compute the result and emit to the stream.
* @param functionFields fields of values to emit with aggregation.
- * @return
+ *
+ * @return the new stream with this operation.
*/
public Stream window(WindowConfig windowConfig, WindowsStoreFactory windowStoreFactory, Fields inputFields,
Aggregator aggregator, Fields functionFields) {
http://git-wip-us.apache.org/repos/asf/storm/blob/b08d7eaf/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowTridentProcessor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowTridentProcessor.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowTridentProcessor.java
index 8898b13..5125e41 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowTridentProcessor.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowTridentProcessor.java
@@ -53,7 +53,7 @@ public class WindowTridentProcessor implements TridentProcessor {
public static final String TRIGGER_COUNT_PREFIX = "tc" + WindowsStore.KEY_SEPARATOR;
public static final String TRIGGER_FIELD_NAME = "_task_info";
- public static final long DEFAULT_INMEMORY_TUPLE_CACHE_LIMIT = 100l;
+ public static final long DEFAULT_INMEMORY_TUPLE_CACHE_LIMIT = 100L;
private final String windowId;
private final Fields inputFields;
[8/9] storm git commit: Merge branch 'STORM-676-1.x' of
http://github.com/satishd/storm into STORM-676-1.x
Posted by sr...@apache.org.
Merge branch 'STORM-676-1.x' of http://github.com/satishd/storm into STORM-676-1.x
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b661bf81
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b661bf81
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b661bf81
Branch: refs/heads/1.x-branch
Commit: b661bf81ec084ee65c7793746e576dcde5157a3c
Parents: 1783d7a 8c263c7
Author: Sriharsha Chintalapani <ha...@hortonworks.com>
Authored: Sun Mar 27 22:12:26 2016 -0700
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Sun Mar 27 22:12:26 2016 -0700
----------------------------------------------------------------------
examples/storm-starter/pom.xml | 13 +-
.../TridentHBaseWindowingStoreTopology.java | 93 +++++++
.../TridentWindowingInmemoryStoreTopology.java | 98 +++++++
.../trident/windowing/HBaseWindowsStore.java | 275 +++++++++++++++++++
.../windowing/HBaseWindowsStoreFactory.java | 55 ++++
storm-core/src/jvm/org/apache/storm/Config.java | 8 +
.../jvm/org/apache/storm/trident/Stream.java | 207 ++++++++++++--
.../apache/storm/trident/TridentTopology.java | 4 +
.../storm/trident/fluent/UniqueIdGen.java | 14 +-
.../storm/trident/operation/builtin/Debug.java | 4 +-
.../windowing/AbstractTridentWindowManager.java | 238 ++++++++++++++++
.../windowing/ITridentWindowManager.java | 59 ++++
.../windowing/InMemoryTridentWindowManager.java | 72 +++++
.../trident/windowing/InMemoryWindowsStore.java | 200 ++++++++++++++
.../windowing/InMemoryWindowsStoreFactory.java | 46 ++++
.../StoreBasedTridentWindowManager.java | 217 +++++++++++++++
.../trident/windowing/TridentBatchTuple.java | 42 +++
.../windowing/WindowTridentProcessor.java | 265 ++++++++++++++++++
.../storm/trident/windowing/WindowsState.java | 52 ++++
.../trident/windowing/WindowsStateFactory.java | 40 +++
.../trident/windowing/WindowsStateUpdater.java | 81 ++++++
.../storm/trident/windowing/WindowsStore.java | 78 ++++++
.../trident/windowing/WindowsStoreFactory.java | 35 +++
.../windowing/config/BaseWindowConfig.java | 48 ++++
.../windowing/config/SlidingCountWindow.java | 43 +++
.../windowing/config/SlidingDurationWindow.java | 44 +++
.../windowing/config/TumblingCountWindow.java | 43 +++
.../config/TumblingDurationWindow.java | 42 +++
.../trident/windowing/config/WindowConfig.java | 57 ++++
.../windowing/strategy/BaseWindowStrategy.java | 32 +++
.../strategy/SlidingCountWindowStrategy.java | 59 ++++
.../strategy/SlidingDurationWindowStrategy.java | 60 ++++
.../strategy/TumblingCountWindowStrategy.java | 60 ++++
.../TumblingDurationWindowStrategy.java | 60 ++++
.../windowing/strategy/WindowStrategy.java | 45 +++
.../apache/storm/windowing/TriggerHandler.java | 2 +-
.../storm/trident/TridentWindowingTest.java | 105 +++++++
37 files changed, 2858 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
[3/9] storm git commit: STORM-676 addressed review comments
Posted by sr...@apache.org.
STORM-676 addressed review comments
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/532bb79b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/532bb79b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/532bb79b
Branch: refs/heads/1.x-branch
Commit: 532bb79b30fdfee04c24d7ed04f63b65c4c44862
Parents: 89c03b8
Author: Satish Duggana <sd...@hortonworks.com>
Authored: Sun Mar 13 10:25:21 2016 +0530
Committer: Satish Duggana <sd...@hortonworks.com>
Committed: Sun Mar 27 10:46:16 2016 +0530
----------------------------------------------------------------------
.../trident/TridentHBaseWindowingStoreTopology.java | 6 ++----
.../TridentWindowingInmemoryStoreTopology.java | 7 +++----
.../hbase/trident/windowing/HBaseWindowsStore.java | 6 +++---
.../windowing/AbstractTridentWindowManager.java | 14 +++++++-------
.../windowing/InMemoryTridentWindowManager.java | 10 +++++-----
.../windowing/InMemoryWindowsStoreFactory.java | 13 +++++++++++--
.../windowing/StoreBasedTridentWindowManager.java | 14 +++++++-------
.../trident/windowing/WindowTridentProcessor.java | 15 ++++++++++-----
.../apache/storm/trident/windowing/WindowsState.java | 6 +++---
.../storm/trident/windowing/WindowsStateUpdater.java | 8 ++++----
10 files changed, 55 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/532bb79b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java
index 24cc8d9..0ebaa1f 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java
@@ -23,7 +23,6 @@ import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.hbase.trident.windowing.HBaseWindowsStoreFactory;
-import org.apache.storm.topology.base.BaseWindowedBolt;
import org.apache.storm.trident.Stream;
import org.apache.storm.trident.TridentTopology;
import org.apache.storm.trident.operation.BaseFunction;
@@ -44,13 +43,12 @@ import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
-import java.util.concurrent.TimeUnit;
/**
*
*/
public class TridentHBaseWindowingStoreTopology {
- private static final Logger log = LoggerFactory.getLogger(TridentHBaseWindowingStoreTopology.class);
+ private static final Logger LOG = LoggerFactory.getLogger(TridentHBaseWindowingStoreTopology.class);
public static StormTopology buildTopology(WindowsStoreFactory windowsStore) throws Exception {
FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3, new Values("the cow jumped over the moon"),
@@ -84,7 +82,7 @@ public class TridentHBaseWindowingStoreTopology {
@Override
public void execute(TridentTuple tuple, TridentCollector collector) {
- log.info("##########Echo.execute: " + tuple);
+ LOG.info("##########Echo.execute: " + tuple);
collector.emit(tuple.getValues());
}
http://git-wip-us.apache.org/repos/asf/storm/blob/532bb79b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java
index 5f0cb4f..a2455a0 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java
@@ -25,7 +25,6 @@ import org.apache.storm.generated.StormTopology;
import org.apache.storm.topology.base.BaseWindowedBolt;
import org.apache.storm.trident.Stream;
import org.apache.storm.trident.TridentTopology;
-import org.apache.storm.trident.operation.BaseAggregator;
import org.apache.storm.trident.operation.BaseFunction;
import org.apache.storm.trident.operation.Function;
import org.apache.storm.trident.operation.TridentCollector;
@@ -53,10 +52,10 @@ import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
- *
+ * Sample application of trident windowing which uses inmemory store for storing tuples in window.
*/
public class TridentWindowingInmemoryStoreTopology {
- private static final Logger log = LoggerFactory.getLogger(TridentWindowingInmemoryStoreTopology.class);
+ private static final Logger LOG = LoggerFactory.getLogger(TridentWindowingInmemoryStoreTopology.class);
public static StormTopology buildTopology(WindowsStoreFactory windowStore, WindowConfig windowConfig) throws Exception {
FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3, new Values("the cow jumped over the moon"),
@@ -91,7 +90,7 @@ public class TridentWindowingInmemoryStoreTopology {
@Override
public void execute(TridentTuple tuple, TridentCollector collector) {
- log.info("##########Echo.execute: " + tuple);
+ LOG.info("##########Echo.execute: " + tuple);
collector.emit(tuple.getValues());
}
http://git-wip-us.apache.org/repos/asf/storm/blob/532bb79b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java
----------------------------------------------------------------------
diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java
index 47879a4..ff3fbf9 100644
--- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java
+++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java
@@ -51,7 +51,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
*
*/
public class HBaseWindowsStore implements WindowsStore {
- private static final Logger log = LoggerFactory.getLogger(HBaseWindowsStore.class);
+ private static final Logger LOG = LoggerFactory.getLogger(HBaseWindowsStore.class);
public static final String UTF_8 = "utf-8";
private final ThreadLocal<HTable> threadLocalHtable;
@@ -136,7 +136,7 @@ public class HBaseWindowsStore implements WindowsStore {
for (int i=0; i<results.length; i++) {
Result result = results[i];
if(result.isEmpty()) {
- log.error("Got empty result for key [{}]", keys.get(i));
+ LOG.error("Got empty result for key [{}]", keys.get(i));
throw new RuntimeException("Received empty result for key: "+keys.get(i));
}
Input input = new Input(result.getValue(family, qualifier));
@@ -267,7 +267,7 @@ public class HBaseWindowsStore implements WindowsStore {
try {
htable.close();
} catch (IOException e) {
- log.error(e.getMessage(), e);
+ LOG.error(e.getMessage(), e);
}
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/532bb79b/storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java
index 2941e28..fd7a957 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java
@@ -47,7 +47,7 @@ import java.util.concurrent.atomic.AtomicInteger;
*
*/
public abstract class AbstractTridentWindowManager<T> implements ITridentWindowManager {
- private static final Logger log = LoggerFactory.getLogger(AbstractTridentWindowManager.class);
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractTridentWindowManager.class);
protected final WindowManager<T> windowManager;
protected final Aggregator aggregator;
@@ -89,12 +89,12 @@ public abstract class AbstractTridentWindowManager<T> implements ITridentWindowM
}
private void preInitialize() {
- log.debug("Getting current trigger count for this component/task");
+ LOG.debug("Getting current trigger count for this component/task");
// get trigger count value from store
Object result = windowStore.get(windowTriggerCountId);
Integer currentCount = 0;
if(result == null) {
- log.info("No current trigger count in windows store.");
+ LOG.info("No current trigger count in windows store.");
} else {
currentCount = (Integer) result + 1;
}
@@ -119,13 +119,13 @@ public abstract class AbstractTridentWindowManager<T> implements ITridentWindowM
@Override
public void onExpiry(List<T> expiredEvents) {
- log.debug("onExpiry is invoked");
+ LOG.debug("onExpiry is invoked");
onTuplesExpired(expiredEvents);
}
@Override
public void onActivation(List<T> events, List<T> newEvents, List<T> expired) {
- log.debug("onActivation is invoked with events size: {}", events.size());
+ LOG.debug("onActivation is invoked with events size: [{}]", events.size());
// trigger occurred, create an aggregation and keep them in store
int currentTriggerId = triggerId.incrementAndGet();
execAggregatorAndStoreResult(currentTriggerId, events);
@@ -230,10 +230,10 @@ public abstract class AbstractTridentWindowManager<T> implements ITridentWindowM
public void shutdown() {
try {
- log.info("window manager [{}] is being shutdown", windowManager);
+ LOG.info("window manager [{}] is being shutdown", windowManager);
windowManager.shutdown();
} finally {
- log.info("window store [{}] is being shutdown", windowStore);
+ LOG.info("window store [{}] is being shutdown", windowStore);
windowStore.shutdown();
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/532bb79b/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryTridentWindowManager.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryTridentWindowManager.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryTridentWindowManager.java
index cbb30af..e47cc9a 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryTridentWindowManager.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryTridentWindowManager.java
@@ -33,7 +33,7 @@ import java.util.List;
* This {@code ITridentWindowManager} instance stores all the tuples and trigger related information inmemory.
*/
public class InMemoryTridentWindowManager extends AbstractTridentWindowManager<TridentTuple> {
- private static final Logger log = LoggerFactory.getLogger(InMemoryTridentWindowManager.class);
+ private static final Logger LOG = LoggerFactory.getLogger(InMemoryTridentWindowManager.class);
public InMemoryTridentWindowManager(WindowConfig windowConfig, String windowTaskId, WindowsStore windowStore, Aggregator aggregator,
BatchOutputCollector delegateCollector) {
@@ -42,7 +42,7 @@ public class InMemoryTridentWindowManager extends AbstractTridentWindowManager<T
@Override
protected void initialize() {
- log.debug("noop in initialize");
+ LOG.debug("noop in initialize");
}
@Override
@@ -52,17 +52,17 @@ public class InMemoryTridentWindowManager extends AbstractTridentWindowManager<T
@Override
public void onTuplesExpired(List<TridentTuple> expiredTuples) {
- log.debug("InMemoryTridentWindowManager.onTuplesExpired");
+ LOG.debug("InMemoryTridentWindowManager.onTuplesExpired");
}
public void addTuplesBatch(Object batchId, List<TridentTuple> tuples) {
// check if they are already added then ignore these tuples. This batch is replayed.
if (activeBatches.contains(getBatchTxnId(batchId))) {
- log.info("Ignoring already added tuples with batch: %s", batchId);
+ LOG.info("Ignoring already added tuples with batch: [{}]", batchId);
return;
}
- log.debug("Adding tuples to window-manager for batch: ", batchId);
+ LOG.debug("Adding tuples to window-manager for batch: [{}]", batchId);
for (TridentTuple tridentTuple : tuples) {
windowManager.add(tridentTuple);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/532bb79b/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryWindowsStoreFactory.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryWindowsStoreFactory.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryWindowsStoreFactory.java
index 32027a9..cf65594 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryWindowsStoreFactory.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryWindowsStoreFactory.java
@@ -18,9 +18,18 @@
*/
package org.apache.storm.trident.windowing;
+import org.apache.storm.trident.operation.Aggregator;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.windowing.config.WindowConfig;
+import org.apache.storm.tuple.Fields;
+
+import java.util.List;
+
/**
- * InMemoryWindowsStoreFactory contains a single instance of {@code InMemoryWindowsStore} which will be used for
- * storing tuples and triggers of the window and successfully emitted triggers can be removed from {@code StateUpdater}.
+ * InMemoryWindowsStoreFactory contains a single instance of {@link InMemoryWindowsStore} which will be used for
+ * storing tuples and triggers of the window. The same InMemoryWindowsStoreFactory instance is passed to {@link WindowsStateUpdater},
+ * which removes successfully emitted triggers from the same {@code inMemoryWindowsStore} instance in
+ * {@link WindowsStateUpdater#updateState(WindowsState, List, TridentCollector)}.
*
*/
public class InMemoryWindowsStoreFactory implements WindowsStoreFactory {
http://git-wip-us.apache.org/repos/asf/storm/blob/532bb79b/storm-core/src/jvm/org/apache/storm/trident/windowing/StoreBasedTridentWindowManager.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/StoreBasedTridentWindowManager.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/StoreBasedTridentWindowManager.java
index 885e508..58b24a2 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/windowing/StoreBasedTridentWindowManager.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/StoreBasedTridentWindowManager.java
@@ -40,7 +40,7 @@ import java.util.concurrent.atomic.AtomicLong;
*
*/
public class StoreBasedTridentWindowManager extends AbstractTridentWindowManager<TridentBatchTuple> {
- private static final Logger log = LoggerFactory.getLogger(StoreBasedTridentWindowManager.class);
+ private static final Logger LOG = LoggerFactory.getLogger(StoreBasedTridentWindowManager.class);
private static final String TUPLE_PREFIX = "tu" + WindowsStore.KEY_SEPARATOR;
@@ -75,14 +75,14 @@ public class StoreBasedTridentWindowManager extends AbstractTridentWindowManager
if (key.startsWith(windowTupleTaskId)) {
int tupleIndexValue = lastPart(key);
String batchId = secondLastPart(key);
- log.debug("Received tuple with batch [{}] and tuple index [{}]", batchId, tupleIndexValue);
+ LOG.debug("Received tuple with batch [{}] and tuple index [{}]", batchId, tupleIndexValue);
windowManager.add(new TridentBatchTuple(batchId, System.currentTimeMillis(), tupleIndexValue));
} else if (key.startsWith(windowTriggerTaskId)) {
triggerKeys.add(key);
- log.debug("Received trigger with key [{}]", key);
+ LOG.debug("Received trigger with key [{}]", key);
} else if(key.startsWith(windowTriggerInprocessId)) {
attemptedTriggerKeys.add(key);
- log.debug("Received earlier unsuccessful trigger [{}] from windows store [{}]", key);
+ LOG.debug("Received earlier unsuccessful trigger [{}] from windows store [{}]", key);
}
}
@@ -99,7 +99,7 @@ public class StoreBasedTridentWindowManager extends AbstractTridentWindowManager
for (Object triggerObject : triggerObjects) {
int id = lastPart(triggerKeys.get(i++));
if(!triggersToBeIgnored.contains(id)) {
- log.info("Adding pending trigger value [{}]", triggerObject);
+ LOG.info("Adding pending trigger value [{}]", triggerObject);
pendingTriggers.add(new TriggerResult(id, (List<List<Object>>) triggerObject));
}
}
@@ -131,11 +131,11 @@ public class StoreBasedTridentWindowManager extends AbstractTridentWindowManager
public void addTuplesBatch(Object batchId, List<TridentTuple> tuples) {
// check if they are already added then ignore these tuples. This batch is replayed.
if (activeBatches.contains(getBatchTxnId(batchId))) {
- log.info("Ignoring already added tuples with batch: %s", batchId);
+ LOG.info("Ignoring already added tuples with batch: [{}]", batchId);
return;
}
- log.debug("Adding tuples to window-manager for batch: ", batchId);
+ LOG.debug("Adding tuples to window-manager for batch: [{}]", batchId);
List<WindowsStore.Entry> entries = new ArrayList<>();
for (int i = 0; i < tuples.size(); i++) {
String key = keyOf(batchId);
http://git-wip-us.apache.org/repos/asf/storm/blob/532bb79b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowTridentProcessor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowTridentProcessor.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowTridentProcessor.java
index c2d9362..8898b13 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowTridentProcessor.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowTridentProcessor.java
@@ -46,7 +46,7 @@ import java.util.Queue;
*
*/
public class WindowTridentProcessor implements TridentProcessor {
- private static final Logger log = LoggerFactory.getLogger(WindowTridentProcessor.class);
+ private static final Logger LOG = LoggerFactory.getLogger(WindowTridentProcessor.class);
public static final String TRIGGER_INPROCESS_PREFIX = "tip" + WindowsStore.KEY_SEPARATOR;
public static final String TRIGGER_PREFIX = "tr" + WindowsStore.KEY_SEPARATOR;
@@ -127,8 +127,13 @@ public class WindowTridentProcessor implements TridentProcessor {
@Override
public void cleanup() {
- log.info("shutting down window manager");
- tridentWindowManager.shutdown();
+ LOG.info("shutting down window manager");
+ try {
+ tridentWindowManager.shutdown();
+ } catch (Exception ex) {
+ LOG.error("Error occurred while cleaning up window processor", ex);
+ throw ex;
+ }
}
@Override
@@ -150,7 +155,7 @@ public class WindowTridentProcessor implements TridentProcessor {
Object batchId = processorContext.batchId;
Object batchTxnId = getBatchTxnId(batchId);
- log.debug("Received finishBatch of : {} ", batchId);
+ LOG.debug("Received finishBatch of : [{}] ", batchId);
// get all the tuples in a batch and add it to trident-window-manager
List<TridentTuple> tuples = (List<TridentTuple>) processorContext.state[tridentContext.getStateIndex()];
tridentWindowManager.addTuplesBatch(batchId, tuples);
@@ -171,7 +176,7 @@ public class WindowTridentProcessor implements TridentProcessor {
if(triggerValues == null) {
pendingTriggerIds = new ArrayList<>();
Queue<StoreBasedTridentWindowManager.TriggerResult> pendingTriggers = tridentWindowManager.getPendingTriggers();
- log.debug("pending triggers at batch: {} and triggers.size: {} ", batchId, pendingTriggers.size());
+ LOG.debug("pending triggers at batch: [{}] and triggers.size: [{}] ", batchId, pendingTriggers.size());
try {
Iterator<StoreBasedTridentWindowManager.TriggerResult> pendingTriggersIter = pendingTriggers.iterator();
List<Object> values = new ArrayList<>();
http://git-wip-us.apache.org/repos/asf/storm/blob/532bb79b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsState.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsState.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsState.java
index 378d24f..faf73d6 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsState.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsState.java
@@ -28,7 +28,7 @@ import org.slf4j.LoggerFactory;
*
*/
public class WindowsState implements State {
- private static final Logger log = LoggerFactory.getLogger(WindowsState.class);
+ private static final Logger LOG = LoggerFactory.getLogger(WindowsState.class);
private Long currentTxId;
@@ -38,12 +38,12 @@ public class WindowsState implements State {
@Override
public void beginCommit(Long txId) {
currentTxId = txId;
- log.debug(" WindowsState.beginCommit:: [{}] ", txId);
+ LOG.debug(" WindowsState.beginCommit:: [{}] ", txId);
}
@Override
public void commit(Long txId) {
- log.debug("WindowsState.commit :: [{}]", txId);
+ LOG.debug("WindowsState.commit :: [{}]", txId);
}
public Long getCurrentTxId() {
http://git-wip-us.apache.org/repos/asf/storm/blob/532bb79b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStateUpdater.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStateUpdater.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStateUpdater.java
index 45ac885..6664b41 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStateUpdater.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStateUpdater.java
@@ -36,7 +36,7 @@ import java.util.Map;
*/
public class WindowsStateUpdater implements StateUpdater<WindowsState> {
- private static final Logger log = LoggerFactory.getLogger(WindowsStateUpdater.class);
+ private static final Logger LOG = LoggerFactory.getLogger(WindowsStateUpdater.class);
private final WindowsStoreFactory windowStoreFactory;
private WindowsStore windowsStore;
@@ -48,7 +48,7 @@ public class WindowsStateUpdater implements StateUpdater<WindowsState> {
@Override
public void updateState(WindowsState state, List<TridentTuple> tuples, TridentCollector collector) {
Long currentTxId = state.getCurrentTxId();
- log.debug("Removing triggers using WindowStateUpdater, txnId: {} ", currentTxId);
+ LOG.debug("Removing triggers using WindowStateUpdater, txnId: [{}] ", currentTxId);
for (TridentTuple tuple : tuples) {
try {
Object fieldValue = tuple.getValueByField(WindowTridentProcessor.TRIGGER_FIELD_NAME);
@@ -58,11 +58,11 @@ public class WindowsStateUpdater implements StateUpdater<WindowsState> {
WindowTridentProcessor.TriggerInfo triggerInfo = (WindowTridentProcessor.TriggerInfo) fieldValue;
String triggerCompletedKey = WindowTridentProcessor.getWindowTriggerInprocessIdPrefix(triggerInfo.windowTaskId)+currentTxId;
- log.debug("Removing trigger key [{}] and trigger completed key [{}] from store: [{}]", triggerInfo, triggerCompletedKey, windowsStore);
+ LOG.debug("Removing trigger key [{}] and trigger completed key [{}] from store: [{}]", triggerInfo, triggerCompletedKey, windowsStore);
windowsStore.removeAll(Lists.newArrayList(triggerInfo.generateTriggerKey(), triggerCompletedKey));
} catch (Exception ex) {
- log.warn(ex.getMessage());
+ LOG.warn(ex.getMessage());
collector.reportError(ex);
throw new FailedException(ex);
}
[4/9] storm git commit: STORM-676 Addressed review comments on API
aligning with core window API
Posted by sr...@apache.org.
STORM-676 Addressed review comments on API aligning with core window API
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/dd02bcfd
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/dd02bcfd
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/dd02bcfd
Branch: refs/heads/1.x-branch
Commit: dd02bcfdcfe6f92fef055a933722a7b485c8e613
Parents: 532bb79
Author: Satish Duggana <sd...@hortonworks.com>
Authored: Tue Mar 15 14:18:30 2016 +0530
Committer: Satish Duggana <sd...@hortonworks.com>
Committed: Sun Mar 27 10:46:26 2016 +0530
----------------------------------------------------------------------
.../jvm/org/apache/storm/trident/Stream.java | 22 ++++----------------
1 file changed, 4 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/dd02bcfd/storm-core/src/jvm/org/apache/storm/trident/Stream.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/Stream.java b/storm-core/src/jvm/org/apache/storm/trident/Stream.java
index b3a4446..47b087a 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/Stream.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/Stream.java
@@ -611,20 +611,6 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
/**
* Returns a stream of tuples which are aggregated results of a tumbling window with every {@code windowCount} of tuples.
*
- * @param windowCount represents window tuples count
- * @param inputFields projected fields for aggregator
- * @param aggregator aggregator to run on the window of tuples to compute the result and emit to the stream.
- * @param functionFields fields of values to emit with aggregation.
- *
- * @return
- */
- public Stream tumblingCountWindow(int windowCount, Fields inputFields, Aggregator aggregator, Fields functionFields) {
- return window(TumblingCountWindow.of(windowCount), inputFields, aggregator, functionFields);
- }
-
- /**
- * Returns a stream of tuples which are aggregated results of a tumbling window with every {@code windowCount} of tuples.
- *
* @param windowCount represents no of tuples in the window
* @param windowStoreFactory intermediary tuple store for storing windowing tuples
* @param inputFields projected fields for aggregator
@@ -633,7 +619,7 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
*
* @return
*/
- public Stream tumblingCountWindow(int windowCount, WindowsStoreFactory windowStoreFactory,
+ public Stream tumblingWindow(int windowCount, WindowsStoreFactory windowStoreFactory,
Fields inputFields, Aggregator aggregator, Fields functionFields) {
return window(TumblingCountWindow.of(windowCount), windowStoreFactory, inputFields, aggregator, functionFields);
}
@@ -651,7 +637,7 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
*
* @return
*/
- public Stream slidingCountWindow(int windowCount, int slideCount, WindowsStoreFactory windowStoreFactory,
+ public Stream slidingWindow(int windowCount, int slideCount, WindowsStoreFactory windowStoreFactory,
Fields inputFields, Aggregator aggregator, Fields functionFields) {
return window(SlidingCountWindow.of(windowCount, slideCount), windowStoreFactory, inputFields, aggregator, functionFields);
}
@@ -667,7 +653,7 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
*
* @return
*/
- public Stream tumblingTimeWindow(BaseWindowedBolt.Duration windowDuration, WindowsStoreFactory windowStoreFactory,
+ public Stream tumblingWindow(BaseWindowedBolt.Duration windowDuration, WindowsStoreFactory windowStoreFactory,
Fields inputFields, Aggregator aggregator, Fields functionFields) {
return window(TumblingDurationWindow.of(windowDuration), windowStoreFactory, inputFields, aggregator, functionFields);
}
@@ -685,7 +671,7 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
*
* @return
*/
- public Stream slidingTimeWindow(BaseWindowedBolt.Duration windowDuration, BaseWindowedBolt.Duration slideDuration,
+ public Stream slidingWindow(BaseWindowedBolt.Duration windowDuration, BaseWindowedBolt.Duration slideDuration,
WindowsStoreFactory windowStoreFactory, Fields inputFields, Aggregator aggregator, Fields functionFields) {
return window(SlidingDurationWindow.of(windowDuration, slideDuration), windowStoreFactory, inputFields, aggregator, functionFields);
}
[5/9] storm git commit: STORM-676 Addressed review comments from Arun
Posted by sr...@apache.org.
STORM-676 Addressed review comments from Arun
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/3a96f20f
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/3a96f20f
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/3a96f20f
Branch: refs/heads/1.x-branch
Commit: 3a96f20f09b3eaccc59d333a581c3b0c7d345ccc
Parents: dd02bcf
Author: Satish Duggana <sd...@hortonworks.com>
Authored: Wed Mar 23 11:35:37 2016 +0530
Committer: Satish Duggana <sd...@hortonworks.com>
Committed: Sun Mar 27 10:46:49 2016 +0530
----------------------------------------------------------------------
examples/storm-starter/pom.xml | 31 ------------
.../TridentHBaseWindowingStoreTopology.java | 49 ++++--------------
.../TridentWindowingInmemoryStoreTopology.java | 53 ++++----------------
.../windowing/HBaseWindowsStoreFactory.java | 3 ++
.../jvm/org/apache/storm/trident/Stream.java | 12 ++---
.../windowing/AbstractTridentWindowManager.java | 1 -
.../windowing/InMemoryTridentWindowManager.java | 6 ---
.../StoreBasedTridentWindowManager.java | 6 ---
8 files changed, 28 insertions(+), 133 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/3a96f20f/examples/storm-starter/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-starter/pom.xml b/examples/storm-starter/pom.xml
index e702a5d..6053595 100644
--- a/examples/storm-starter/pom.xml
+++ b/examples/storm-starter/pom.xml
@@ -35,7 +35,6 @@
<!-- see comment below... This fixes an annoyance with intellij -->
<provided.scope>provided</provided.scope>
<hbase.version>0.98.4-hadoop2</hbase.version>
- <hbase.version>1.1.2</hbase.version>
</properties>
<profiles>
@@ -175,36 +174,6 @@
<artifactId>storm-redis</artifactId>
<version>${project.version}</version>
</dependency>
- <dependency>
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase-server</artifactId>
- <version>${hbase.version}</version>
- <exclusions>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.zookeeper</groupId>
- <artifactId>zookeeper</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase-client</artifactId>
- <version>${hbase.version}</version>
- <exclusions>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.zookeeper</groupId>
- <artifactId>zookeeper</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/storm/blob/3a96f20f/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java
index 0ebaa1f..ba18f7c 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java
@@ -25,13 +25,10 @@ import org.apache.storm.generated.StormTopology;
import org.apache.storm.hbase.trident.windowing.HBaseWindowsStoreFactory;
import org.apache.storm.trident.Stream;
import org.apache.storm.trident.TridentTopology;
-import org.apache.storm.trident.operation.BaseFunction;
-import org.apache.storm.trident.operation.Function;
-import org.apache.storm.trident.operation.TridentCollector;
-import org.apache.storm.trident.operation.TridentOperationContext;
-import org.apache.storm.trident.operation.builtin.Debug;
+import org.apache.storm.trident.operation.Consumer;
import org.apache.storm.trident.testing.CountAsAggregator;
import org.apache.storm.trident.testing.FixedBatchSpout;
+import org.apache.storm.trident.testing.Split;
import org.apache.storm.trident.tuple.TridentTuple;
import org.apache.storm.trident.windowing.WindowsStoreFactory;
import org.apache.storm.trident.windowing.config.TumblingCountWindow;
@@ -42,9 +39,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
-import java.util.Map;
/**
+ * Sample application of trident windowing which uses {@link HBaseWindowsStoreFactory}'s store for storing tuples in window.
*
*/
public class TridentHBaseWindowingStoreTopology {
@@ -61,46 +58,20 @@ public class TridentHBaseWindowingStoreTopology {
Stream stream = topology.newStream("spout1", spout).parallelismHint(16).each(new Fields("sentence"),
new Split(), new Fields("word"))
.window(TumblingCountWindow.of(1000), windowsStore, new Fields("word"), new CountAsAggregator(), new Fields("count"))
-// .tumblingTimeWindow(new BaseWindowedBolt.Duration(3, TimeUnit.SECONDS), windowsStore, new Fields("word"), new CountAsAggregator(), new Fields("count"))
- .each(new Fields("count"), new Debug())
- .each(new Fields("count"), new Echo(), new Fields("ct"));
+ .peek(new Consumer() {
+ @Override
+ public void accept(TridentTuple input) {
+ LOG.info("Received tuple: [{}]", input);
+ }
+ });
return topology.build();
}
- public static class Split extends BaseFunction {
- @Override
- public void execute(TridentTuple tuple, TridentCollector collector) {
- String sentence = tuple.getString(0);
- for (String word : sentence.split(" ")) {
- collector.emit(new Values(word));
- }
- }
- }
-
- static class Echo implements Function {
-
- @Override
- public void execute(TridentTuple tuple, TridentCollector collector) {
- LOG.info("##########Echo.execute: " + tuple);
- collector.emit(tuple.getValues());
- }
-
- @Override
- public void prepare(Map conf, TridentOperationContext context) {
-
- }
-
- @Override
- public void cleanup() {
-
- }
- }
-
public static void main(String[] args) throws Exception {
Config conf = new Config();
conf.setMaxSpoutPending(20);
- conf.put(Config.TOPOLOGY_TRIDENT_WINDOWING_INMEMORY_CACHE_LIMIT, 2);
+ conf.put(Config.TOPOLOGY_TRIDENT_WINDOWING_INMEMORY_CACHE_LIMIT, 100);
// window-state table should already be created with cf:tuples column
HBaseWindowsStoreFactory windowStoreFactory = new HBaseWindowsStoreFactory(new HashMap<String, Object>(), "window-state", "cf".getBytes("UTF-8"), "tuples".getBytes("UTF-8"));
http://git-wip-us.apache.org/repos/asf/storm/blob/3a96f20f/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java
index a2455a0..5aec01d 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java
@@ -25,21 +25,14 @@ import org.apache.storm.generated.StormTopology;
import org.apache.storm.topology.base.BaseWindowedBolt;
import org.apache.storm.trident.Stream;
import org.apache.storm.trident.TridentTopology;
-import org.apache.storm.trident.operation.BaseFunction;
-import org.apache.storm.trident.operation.Function;
-import org.apache.storm.trident.operation.TridentCollector;
-import org.apache.storm.trident.operation.TridentOperationContext;
-import org.apache.storm.trident.operation.builtin.Debug;
+import org.apache.storm.trident.operation.Consumer;
import org.apache.storm.trident.testing.CountAsAggregator;
import org.apache.storm.trident.testing.FixedBatchSpout;
+import org.apache.storm.trident.testing.Split;
import org.apache.storm.trident.tuple.TridentTuple;
import org.apache.storm.trident.windowing.InMemoryWindowsStoreFactory;
import org.apache.storm.trident.windowing.WindowsStoreFactory;
-import org.apache.storm.trident.windowing.config.SlidingCountWindow;
-import org.apache.storm.trident.windowing.config.SlidingDurationWindow;
-import org.apache.storm.trident.windowing.config.TumblingCountWindow;
-import org.apache.storm.trident.windowing.config.TumblingDurationWindow;
-import org.apache.storm.trident.windowing.config.WindowConfig;
+import org.apache.storm.trident.windowing.config.*;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
@@ -48,7 +41,6 @@ import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.List;
-import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
@@ -68,43 +60,16 @@ public class TridentWindowingInmemoryStoreTopology {
Stream stream = topology.newStream("spout1", spout).parallelismHint(16).each(new Fields("sentence"),
new Split(), new Fields("word"))
.window(windowConfig, windowStore, new Fields("word"), new CountAsAggregator(), new Fields("count"))
-// .aggregate(new CountAsAggregator(), new Fields("count"))
- .each(new Fields("count"), new Debug())
- .each(new Fields("count"), new Echo(), new Fields("ct"))
- .each(new Fields("ct"), new Debug());
+ .peek(new Consumer() {
+ @Override
+ public void accept(TridentTuple input) {
+ LOG.info("Received tuple: [{}]", input);
+ }
+ });
return topology.build();
}
- public static class Split extends BaseFunction {
- @Override
- public void execute(TridentTuple tuple, TridentCollector collector) {
- String sentence = tuple.getString(0);
- for (String word : sentence.split(" ")) {
- collector.emit(new Values(word));
- }
- }
- }
-
- static class Echo implements Function {
-
- @Override
- public void execute(TridentTuple tuple, TridentCollector collector) {
- LOG.info("##########Echo.execute: " + tuple);
- collector.emit(tuple.getValues());
- }
-
- @Override
- public void prepare(Map conf, TridentOperationContext context) {
-
- }
-
- @Override
- public void cleanup() {
-
- }
- }
-
public static void main(String[] args) throws Exception {
Config conf = new Config();
WindowsStoreFactory mapState = new InMemoryWindowsStoreFactory();
http://git-wip-us.apache.org/repos/asf/storm/blob/3a96f20f/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java
index 56fad58..a49bc87 100644
--- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java
+++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java
@@ -25,6 +25,9 @@ import org.apache.storm.trident.windowing.WindowsStoreFactory;
import java.util.Map;
+/**
+ *
+ */
public class HBaseWindowsStoreFactory implements WindowsStoreFactory {
private final Map<String, Object> config;
private final String tableName;
http://git-wip-us.apache.org/repos/asf/storm/blob/3a96f20f/storm-core/src/jvm/org/apache/storm/trident/Stream.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/Stream.java b/storm-core/src/jvm/org/apache/storm/trident/Stream.java
index 47b087a..444b42a 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/Stream.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/Stream.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- * <p/>
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -629,7 +629,7 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
* and slides the window with {@code slideCount}.
*
* @param windowCount represents tuples count of a window
- * @param slideCount represents sliding count window
+ * @param slideCount the number of tuples after which the window slides
* @param windowStoreFactory intermediary tuple store for storing windowing tuples
* @param inputFields projected fields for aggregator
* @param aggregator aggregator to run on the window of tuples to compute the result and emit to the stream.
@@ -663,7 +663,7 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
* and completes a window at {@code windowDuration}
*
* @param windowDuration represents window duration configuration
- * @param slideDuration represents sliding duration configuration
+ * @param slidingInterval the time duration after which the window slides
* @param windowStoreFactory intermediary tuple store for storing windowing tuples
* @param inputFields projected fields for aggregator
* @param aggregator aggregator to run on the window of tuples to compute the result and emit to the stream.
@@ -671,9 +671,9 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
*
* @return
*/
- public Stream slidingWindow(BaseWindowedBolt.Duration windowDuration, BaseWindowedBolt.Duration slideDuration,
+ public Stream slidingWindow(BaseWindowedBolt.Duration windowDuration, BaseWindowedBolt.Duration slidingInterval,
WindowsStoreFactory windowStoreFactory, Fields inputFields, Aggregator aggregator, Fields functionFields) {
- return window(SlidingDurationWindow.of(windowDuration, slideDuration), windowStoreFactory, inputFields, aggregator, functionFields);
+ return window(SlidingDurationWindow.of(windowDuration, slidingInterval), windowStoreFactory, inputFields, aggregator, functionFields);
}
/**
http://git-wip-us.apache.org/repos/asf/storm/blob/3a96f20f/storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java
index fd7a957..aac18d3 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java
@@ -55,7 +55,6 @@ public abstract class AbstractTridentWindowManager<T> implements ITridentWindowM
protected final String windowTaskId;
protected final WindowsStore windowStore;
- protected final Set<String> activeBatches = new HashSet<>();
protected final Queue<TriggerResult> pendingTriggers = new ConcurrentLinkedQueue<>();
protected final AtomicInteger triggerId = new AtomicInteger();
private final String windowTriggerCountId;
http://git-wip-us.apache.org/repos/asf/storm/blob/3a96f20f/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryTridentWindowManager.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryTridentWindowManager.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryTridentWindowManager.java
index e47cc9a..69eb39e 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryTridentWindowManager.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryTridentWindowManager.java
@@ -56,12 +56,6 @@ public class InMemoryTridentWindowManager extends AbstractTridentWindowManager<T
}
public void addTuplesBatch(Object batchId, List<TridentTuple> tuples) {
- // check if they are already added then ignore these tuples. This batch is replayed.
- if (activeBatches.contains(getBatchTxnId(batchId))) {
- LOG.info("Ignoring already added tuples with batch: [{}]", batchId);
- return;
- }
-
LOG.debug("Adding tuples to window-manager for batch: [{}]", batchId);
for (TridentTuple tridentTuple : tuples) {
windowManager.add(tridentTuple);
http://git-wip-us.apache.org/repos/asf/storm/blob/3a96f20f/storm-core/src/jvm/org/apache/storm/trident/windowing/StoreBasedTridentWindowManager.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/StoreBasedTridentWindowManager.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/StoreBasedTridentWindowManager.java
index 58b24a2..87c1a0f 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/windowing/StoreBasedTridentWindowManager.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/StoreBasedTridentWindowManager.java
@@ -129,12 +129,6 @@ public class StoreBasedTridentWindowManager extends AbstractTridentWindowManager
}
public void addTuplesBatch(Object batchId, List<TridentTuple> tuples) {
- // check if they are already added then ignore these tuples. This batch is replayed.
- if (activeBatches.contains(getBatchTxnId(batchId))) {
- LOG.info("Ignoring already added tuples with batch: [{}]", batchId);
- return;
- }
-
LOG.debug("Adding tuples to window-manager for batch: [{}]", batchId);
List<WindowsStore.Entry> entries = new ArrayList<>();
for (int i = 0; i < tuples.size(); i++) {