You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2016/10/20 19:43:42 UTC
incubator-streams git commit: STREAMS-421: Delete defunct or
not-implemented runtime modules
Repository: incubator-streams
Updated Branches:
refs/heads/master 7662e2733 -> 11e3a0f1b
STREAMS-421: Delete defunct or not-implemented runtime modules
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/11e3a0f1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/11e3a0f1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/11e3a0f1
Branch: refs/heads/master
Commit: 11e3a0f1b78e548c7678a91f6793c8568cfca0c9
Parents: 7662e27
Author: smarthi <sm...@apache.org>
Authored: Thu Oct 20 14:19:30 2016 -0400
Committer: smarthi <sm...@apache.org>
Committed: Thu Oct 20 14:19:30 2016 -0400
----------------------------------------------------------------------
streams-runtimes/pom.xml | 1 -
.../streams-runtime-storm/README.md | 8 --
.../trident/StreamsPersistWriterState.java | 124 -------------------
.../storm/trident/StreamsProcessorFunction.java | 72 -----------
.../storm/trident/StreamsProviderSpout.java | 85 -------------
.../streams/storm/trident/StreamsTopology.java | 73 -----------
6 files changed, 363 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/11e3a0f1/streams-runtimes/pom.xml
----------------------------------------------------------------------
diff --git a/streams-runtimes/pom.xml b/streams-runtimes/pom.xml
index 0194b45..ed18c65 100644
--- a/streams-runtimes/pom.xml
+++ b/streams-runtimes/pom.xml
@@ -37,6 +37,5 @@
<module>streams-runtime-dropwizard</module>
<module>streams-runtime-local</module>
<module>streams-runtime-pig</module>
- <module>streams-runtime-storm</module>
</modules>
</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/11e3a0f1/streams-runtimes/streams-runtime-storm/README.md
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-storm/README.md b/streams-runtimes/streams-runtime-storm/README.md
deleted file mode 100644
index 08d0c3d..0000000
--- a/streams-runtimes/streams-runtime-storm/README.md
+++ /dev/null
@@ -1,8 +0,0 @@
-Apache Streams (incubating)
-Licensed under Apache License 2.0 - http://www.apache.org/licenses/LICENSE-2.0
---------------------------------------------------------------------------------
-
-org.apache.streams:streams-runtime-storm
-========================================
-
-[README.md](src/site/markdown/index.md "README")
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/11e3a0f1/streams-runtimes/streams-runtime-storm/src/main/java/org/apache/streams/storm/trident/StreamsPersistWriterState.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-storm/src/main/java/org/apache/streams/storm/trident/StreamsPersistWriterState.java b/streams-runtimes/streams-runtime-storm/src/main/java/org/apache/streams/storm/trident/StreamsPersistWriterState.java
deleted file mode 100644
index e8b4a09..0000000
--- a/streams-runtimes/streams-runtime-storm/src/main/java/org/apache/streams/storm/trident/StreamsPersistWriterState.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.storm.trident;
-
-import backtype.storm.task.IMetricsContext;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsPersistWriter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import storm.trident.operation.TridentCollector;
-import storm.trident.state.BaseStateUpdater;
-import storm.trident.state.State;
-import storm.trident.state.StateFactory;
-import storm.trident.tuple.TridentTuple;
-
-import java.io.Serializable;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Created by sblackmon on 1/16/14.
- */
-public class StreamsPersistWriterState implements State {
-
- private final static Logger LOGGER = LoggerFactory.getLogger(StreamsPersistWriterState.class);
-
- StreamsPersistWriter writer;
- StreamsPersistStateController controller;
-
- public StreamsPersistWriterState(StreamsPersistStateController controller) {
- this.controller = new StreamsPersistStateController();
- writer.prepare(null);
- }
-
- public void bulkMessages(List<TridentTuple> tuples) {
- for (TridentTuple tuple : tuples) {
- StreamsDatum entry = this.controller.fromTuple(tuple);
- try {
- writer.write(entry);
- } catch (Exception e) {
- LOGGER.error("Exception writing entry : {}", e, entry);
- }
- }
- LOGGER.debug("******** Ending commit");
- }
-
- @Override
- public void beginCommit(Long aLong) {
-
- }
-
- @Override
- public void commit(Long aLong) {
-
- }
-
- public static class Factory implements StateFactory {
-
- private Logger logger;
- private StreamsPersistStateController controller;
-
- public Factory(StreamsPersistWriter writer, StreamsPersistStateController controller) {
- this.controller = controller;
- this.logger = LoggerFactory.getLogger(Factory.class);
- }
-
- @Override
- public State makeState(Map map, IMetricsContext iMetricsContext, int i, int i2) {
- this.logger.debug("Called makeState. . . ");
- // convert map to config object
- return new StreamsPersistWriterState(controller);
- }
-
- }
-
- public static class StreamsPersistStateController implements Serializable {
-
- private String fieldName;
- private ObjectMapper mapper = new ObjectMapper();
-
- public StreamsPersistStateController() {
- this.fieldName = "datum";
- }
-
- public StreamsPersistStateController(String fieldName) {
- this.fieldName = fieldName;
- }
-
- public StreamsDatum fromTuple(TridentTuple tuple) {
- return mapper.convertValue(tuple.getValueByField(this.fieldName), StreamsDatum.class);
- }
-
- }
-
-
-
- public static class StreamsPersistWriterSendMessage extends BaseStateUpdater<StreamsPersistWriterState> {
-
- private Logger logger = LoggerFactory.getLogger(StreamsPersistWriterSendMessage.class);
-
- @Override
- public void updateState(StreamsPersistWriterState writerState, List<TridentTuple> tridentTuples, TridentCollector tridentCollector) {
- this.logger.debug("**** calling send message. . .");
- writerState.bulkMessages(tridentTuples);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/11e3a0f1/streams-runtimes/streams-runtime-storm/src/main/java/org/apache/streams/storm/trident/StreamsProcessorFunction.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-storm/src/main/java/org/apache/streams/storm/trident/StreamsProcessorFunction.java b/streams-runtimes/streams-runtime-storm/src/main/java/org/apache/streams/storm/trident/StreamsProcessorFunction.java
deleted file mode 100644
index 4e183f0..0000000
--- a/streams-runtimes/streams-runtime-storm/src/main/java/org/apache/streams/storm/trident/StreamsProcessorFunction.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.storm.trident;
-
-import com.google.common.collect.Lists;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsPersistWriter;
-import org.apache.streams.core.StreamsProcessor;
-import org.joda.time.DateTime;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import storm.trident.operation.Function;
-import storm.trident.operation.TridentCollector;
-import storm.trident.operation.TridentOperationContext;
-import storm.trident.tuple.TridentTuple;
-
-import java.math.BigInteger;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Created by sblackmon on 4/6/14.
- */
-public class StreamsProcessorFunction implements Function {
-
- private final static Logger LOGGER = LoggerFactory.getLogger(StreamsProcessorFunction.class);
-
- StreamsProcessor processor;
-
- @Override
- public void execute(TridentTuple objects, TridentCollector tridentCollector) {
- StreamsDatum datum = new StreamsDatum(
- objects.getValueByField("document"),
- new DateTime(objects.getLongByField("timestamp")),
- new BigInteger(objects.getStringByField("sequenceid"))
- );
- List<StreamsDatum> results = processor.process(datum);
- for( StreamsDatum result : results ) {
- tridentCollector.emit( Lists.newArrayList(
- datum.getTimestamp(),
- datum.getSequenceid(),
- datum.getDocument()
- ));
- }
- }
-
- @Override
- public void prepare(Map map, TridentOperationContext tridentOperationContext) {
- processor.prepare(map);
- }
-
- @Override
- public void cleanup() {
- processor.cleanUp();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/11e3a0f1/streams-runtimes/streams-runtime-storm/src/main/java/org/apache/streams/storm/trident/StreamsProviderSpout.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-storm/src/main/java/org/apache/streams/storm/trident/StreamsProviderSpout.java b/streams-runtimes/streams-runtime-storm/src/main/java/org/apache/streams/storm/trident/StreamsProviderSpout.java
deleted file mode 100644
index ea90558..0000000
--- a/streams-runtimes/streams-runtime-storm/src/main/java/org/apache/streams/storm/trident/StreamsProviderSpout.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.storm.trident;
-
-import backtype.storm.task.TopologyContext;
-import backtype.storm.tuple.Fields;
-import com.google.common.collect.Lists;
-import org.apache.commons.collections4.IteratorUtils;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProvider;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import storm.trident.operation.TridentCollector;
-import storm.trident.spout.IBatchSpout;
-
-import java.util.List;
-import java.util.Map;
-
-/**
- * Created by sblackmon on 1/16/14.
- */
-public class StreamsProviderSpout implements IBatchSpout {
-
- private final static Logger LOGGER = LoggerFactory.getLogger(StreamsProviderSpout.class);
-
- StreamsProvider provider;
-
- public StreamsProviderSpout(StreamsProvider provider) {
- this.provider = provider;
- }
-
- @Override
- public void open(Map map, TopologyContext topologyContext) {
- provider.prepare(topologyContext);
- }
-
- @Override
- public synchronized void emitBatch(long l, TridentCollector tridentCollector) {
- List<StreamsDatum> batch;
- batch = IteratorUtils.toList(provider.readCurrent().iterator());
- for( StreamsDatum datum : batch ) {
- tridentCollector.emit( Lists.newArrayList(
- datum.getTimestamp(),
- datum.getSequenceid(),
- datum.getDocument()
- ));
- }
- }
-
- @Override
- public void ack(long l) {
-
- }
-
- @Override
- public void close() {
- provider.cleanUp();
- }
-
- @Override
- public Map getComponentConfiguration() {
- return null;
- }
-
- @Override
- public Fields getOutputFields() {
- return new Fields("timestamp", "sequenceid", "document");
- }
-};
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/11e3a0f1/streams-runtimes/streams-runtime-storm/src/main/java/org/apache/streams/storm/trident/StreamsTopology.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-storm/src/main/java/org/apache/streams/storm/trident/StreamsTopology.java b/streams-runtimes/streams-runtime-storm/src/main/java/org/apache/streams/storm/trident/StreamsTopology.java
deleted file mode 100644
index 0bc97bd..0000000
--- a/streams-runtimes/streams-runtime-storm/src/main/java/org/apache/streams/storm/trident/StreamsTopology.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.storm.trident;
-
-import backtype.storm.Config;
-import org.apache.streams.config.StreamsConfiguration;
-import org.apache.streams.config.StreamsConfigurator;
-import storm.trident.TridentTopology;
-
-/**
- * Created with IntelliJ IDEA.
- * User: sblackmon
- * Date: 9/20/13
- * Time: 5:48 PM
- * To change this template use File | Settings | File Templates.
- */
-public abstract class StreamsTopology extends TridentTopology {
-
- StreamsConfiguration configuration;
- Config stormConfig;
- String runmode;
-
- protected StreamsTopology() {
-
- runmode = StreamsConfigurator.config.getConfig("storm").getString("runmode");
- stormConfig = new Config();
-
- }
-
- protected StreamsTopology(StreamsConfiguration configuration) {
- this.configuration = configuration;
- }
-
- public StreamsConfiguration getConfiguration() {
- return configuration;
- }
-
- public void setConfiguration(StreamsConfiguration configuration) {
- this.configuration = configuration;
- }
-
- public String getRunmode() {
- return runmode;
- }
-
- public void setRunmode(String runmode) {
- this.runmode = runmode;
- }
-
- public Config getStormConfig() {
- return stormConfig;
- }
-
- public void setStormConfig(Config stormConfig) {
- this.stormConfig = stormConfig;
- }
-}