You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2016/10/20 19:43:42 UTC

incubator-streams git commit: STREAMS-421: Delete defunct or not-implemented runtime modules

Repository: incubator-streams
Updated Branches:
  refs/heads/master 7662e2733 -> 11e3a0f1b


STREAMS-421: Delete defunct or not-implemented runtime modules


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/11e3a0f1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/11e3a0f1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/11e3a0f1

Branch: refs/heads/master
Commit: 11e3a0f1b78e548c7678a91f6793c8568cfca0c9
Parents: 7662e27
Author: smarthi <sm...@apache.org>
Authored: Thu Oct 20 14:19:30 2016 -0400
Committer: smarthi <sm...@apache.org>
Committed: Thu Oct 20 14:19:30 2016 -0400

----------------------------------------------------------------------
 streams-runtimes/pom.xml                        |   1 -
 .../streams-runtime-storm/README.md             |   8 --
 .../trident/StreamsPersistWriterState.java      | 124 -------------------
 .../storm/trident/StreamsProcessorFunction.java |  72 -----------
 .../storm/trident/StreamsProviderSpout.java     |  85 -------------
 .../streams/storm/trident/StreamsTopology.java  |  73 -----------
 6 files changed, 363 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/11e3a0f1/streams-runtimes/pom.xml
----------------------------------------------------------------------
diff --git a/streams-runtimes/pom.xml b/streams-runtimes/pom.xml
index 0194b45..ed18c65 100644
--- a/streams-runtimes/pom.xml
+++ b/streams-runtimes/pom.xml
@@ -37,6 +37,5 @@
         <module>streams-runtime-dropwizard</module>
         <module>streams-runtime-local</module>
         <module>streams-runtime-pig</module>
-        <module>streams-runtime-storm</module>
     </modules>
 </project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/11e3a0f1/streams-runtimes/streams-runtime-storm/README.md
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-storm/README.md b/streams-runtimes/streams-runtime-storm/README.md
deleted file mode 100644
index 08d0c3d..0000000
--- a/streams-runtimes/streams-runtime-storm/README.md
+++ /dev/null
@@ -1,8 +0,0 @@
-Apache Streams (incubating)
-Licensed under Apache License 2.0 - http://www.apache.org/licenses/LICENSE-2.0
---------------------------------------------------------------------------------
-
-org.apache.streams:streams-runtime-storm
-========================================
-
-[README.md](src/site/markdown/index.md "README")

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/11e3a0f1/streams-runtimes/streams-runtime-storm/src/main/java/org/apache/streams/storm/trident/StreamsPersistWriterState.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-storm/src/main/java/org/apache/streams/storm/trident/StreamsPersistWriterState.java b/streams-runtimes/streams-runtime-storm/src/main/java/org/apache/streams/storm/trident/StreamsPersistWriterState.java
deleted file mode 100644
index e8b4a09..0000000
--- a/streams-runtimes/streams-runtime-storm/src/main/java/org/apache/streams/storm/trident/StreamsPersistWriterState.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.storm.trident;
-
-import backtype.storm.task.IMetricsContext;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsPersistWriter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import storm.trident.operation.TridentCollector;
-import storm.trident.state.BaseStateUpdater;
-import storm.trident.state.State;
-import storm.trident.state.StateFactory;
-import storm.trident.tuple.TridentTuple;
-
-import java.io.Serializable;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Created by sblackmon on 1/16/14.
- */
-public class StreamsPersistWriterState implements State {
-
-    private final static Logger LOGGER = LoggerFactory.getLogger(StreamsPersistWriterState.class);
-
-    StreamsPersistWriter writer;
-    StreamsPersistStateController controller;
-
-    public StreamsPersistWriterState(StreamsPersistStateController controller) {
-        this.controller = new StreamsPersistStateController();
-        writer.prepare(null);
-    }
-
-    public void bulkMessages(List<TridentTuple> tuples) {
-        for (TridentTuple tuple : tuples) {
-            StreamsDatum entry = this.controller.fromTuple(tuple);
-            try {
-                writer.write(entry);
-            } catch (Exception e) {
-                LOGGER.error("Exception writing entry : {}", e, entry);
-            }
-        }
-        LOGGER.debug("******** Ending commit");
-    }
-
-    @Override
-    public void beginCommit(Long aLong) {
-
-    }
-
-    @Override
-    public void commit(Long aLong) {
-
-    }
-
-    public static class Factory implements StateFactory {
-
-        private Logger logger;
-        private StreamsPersistStateController controller;
-
-        public Factory(StreamsPersistWriter writer, StreamsPersistStateController controller) {
-            this.controller = controller;
-            this.logger = LoggerFactory.getLogger(Factory.class);
-        }
-
-        @Override
-        public State makeState(Map map, IMetricsContext iMetricsContext, int i, int i2) {
-            this.logger.debug("Called makeState. . . ");
-            // convert map to config object
-            return new StreamsPersistWriterState(controller);
-        }
-
-    }
-
-    public static class StreamsPersistStateController implements Serializable {
-
-        private String fieldName;
-        private ObjectMapper mapper = new ObjectMapper();
-
-        public StreamsPersistStateController() {
-            this.fieldName = "datum";
-        }
-
-        public StreamsPersistStateController(String fieldName) {
-            this.fieldName = fieldName;
-        }
-
-        public StreamsDatum fromTuple(TridentTuple tuple) {
-            return mapper.convertValue(tuple.getValueByField(this.fieldName), StreamsDatum.class);
-        }
-
-    }
-
-
-
-    public static class StreamsPersistWriterSendMessage extends BaseStateUpdater<StreamsPersistWriterState> {
-
-        private Logger logger = LoggerFactory.getLogger(StreamsPersistWriterSendMessage.class);
-
-        @Override
-        public void updateState(StreamsPersistWriterState writerState, List<TridentTuple> tridentTuples, TridentCollector tridentCollector) {
-            this.logger.debug("****  calling send message. .  .");
-            writerState.bulkMessages(tridentTuples);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/11e3a0f1/streams-runtimes/streams-runtime-storm/src/main/java/org/apache/streams/storm/trident/StreamsProcessorFunction.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-storm/src/main/java/org/apache/streams/storm/trident/StreamsProcessorFunction.java b/streams-runtimes/streams-runtime-storm/src/main/java/org/apache/streams/storm/trident/StreamsProcessorFunction.java
deleted file mode 100644
index 4e183f0..0000000
--- a/streams-runtimes/streams-runtime-storm/src/main/java/org/apache/streams/storm/trident/StreamsProcessorFunction.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.storm.trident;
-
-import com.google.common.collect.Lists;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsPersistWriter;
-import org.apache.streams.core.StreamsProcessor;
-import org.joda.time.DateTime;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import storm.trident.operation.Function;
-import storm.trident.operation.TridentCollector;
-import storm.trident.operation.TridentOperationContext;
-import storm.trident.tuple.TridentTuple;
-
-import java.math.BigInteger;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Created by sblackmon on 4/6/14.
- */
-public class StreamsProcessorFunction implements Function {
-
-    private final static Logger LOGGER = LoggerFactory.getLogger(StreamsProcessorFunction.class);
-
-    StreamsProcessor processor;
-
-    @Override
-    public void execute(TridentTuple objects, TridentCollector tridentCollector) {
-        StreamsDatum datum = new StreamsDatum(
-                objects.getValueByField("document"),
-                new DateTime(objects.getLongByField("timestamp")),
-                new BigInteger(objects.getStringByField("sequenceid"))
-        );
-        List<StreamsDatum> results = processor.process(datum);
-        for( StreamsDatum result : results ) {
-            tridentCollector.emit( Lists.newArrayList(
-                    datum.getTimestamp(),
-                    datum.getSequenceid(),
-                    datum.getDocument()
-            ));
-        }
-    }
-
-    @Override
-    public void prepare(Map map, TridentOperationContext tridentOperationContext) {
-        processor.prepare(map);
-    }
-
-    @Override
-    public void cleanup() {
-        processor.cleanUp();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/11e3a0f1/streams-runtimes/streams-runtime-storm/src/main/java/org/apache/streams/storm/trident/StreamsProviderSpout.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-storm/src/main/java/org/apache/streams/storm/trident/StreamsProviderSpout.java b/streams-runtimes/streams-runtime-storm/src/main/java/org/apache/streams/storm/trident/StreamsProviderSpout.java
deleted file mode 100644
index ea90558..0000000
--- a/streams-runtimes/streams-runtime-storm/src/main/java/org/apache/streams/storm/trident/StreamsProviderSpout.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.storm.trident;
-
-import backtype.storm.task.TopologyContext;
-import backtype.storm.tuple.Fields;
-import com.google.common.collect.Lists;
-import org.apache.commons.collections4.IteratorUtils;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProvider;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import storm.trident.operation.TridentCollector;
-import storm.trident.spout.IBatchSpout;
-
-import java.util.List;
-import java.util.Map;
-
-/**
- * Created by sblackmon on 1/16/14.
- */
-public class StreamsProviderSpout implements IBatchSpout {
-
-    private final static Logger LOGGER = LoggerFactory.getLogger(StreamsProviderSpout.class);
-
-    StreamsProvider provider;
-
-    public StreamsProviderSpout(StreamsProvider provider) {
-        this.provider = provider;
-    }
-
-    @Override
-    public void open(Map map, TopologyContext topologyContext) {
-        provider.prepare(topologyContext);
-    }
-
-    @Override
-    public synchronized void emitBatch(long l, TridentCollector tridentCollector) {
-        List<StreamsDatum> batch;
-        batch = IteratorUtils.toList(provider.readCurrent().iterator());
-        for( StreamsDatum datum : batch ) {
-            tridentCollector.emit( Lists.newArrayList(
-                    datum.getTimestamp(),
-                    datum.getSequenceid(),
-                    datum.getDocument()
-            ));
-        }
-    }
-
-    @Override
-    public void ack(long l) {
-
-    }
-
-    @Override
-    public void close() {
-        provider.cleanUp();
-    }
-
-    @Override
-    public Map getComponentConfiguration() {
-        return null;
-    }
-
-    @Override
-    public Fields getOutputFields() {
-        return new Fields("timestamp", "sequenceid", "document");
-    }
-};

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/11e3a0f1/streams-runtimes/streams-runtime-storm/src/main/java/org/apache/streams/storm/trident/StreamsTopology.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-storm/src/main/java/org/apache/streams/storm/trident/StreamsTopology.java b/streams-runtimes/streams-runtime-storm/src/main/java/org/apache/streams/storm/trident/StreamsTopology.java
deleted file mode 100644
index 0bc97bd..0000000
--- a/streams-runtimes/streams-runtime-storm/src/main/java/org/apache/streams/storm/trident/StreamsTopology.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.storm.trident;
-
-import backtype.storm.Config;
-import org.apache.streams.config.StreamsConfiguration;
-import org.apache.streams.config.StreamsConfigurator;
-import storm.trident.TridentTopology;
-
-/**
- * Created with IntelliJ IDEA.
- * User: sblackmon
- * Date: 9/20/13
- * Time: 5:48 PM
- * To change this template use File | Settings | File Templates.
- */
-public abstract class StreamsTopology extends TridentTopology {
-
-    StreamsConfiguration configuration;
-    Config stormConfig;
-    String runmode;
-
-    protected StreamsTopology() {
-
-        runmode = StreamsConfigurator.config.getConfig("storm").getString("runmode");
-        stormConfig = new Config();
-
-    }
-
-    protected StreamsTopology(StreamsConfiguration configuration) {
-        this.configuration = configuration;
-    }
-
-    public StreamsConfiguration getConfiguration() {
-        return configuration;
-    }
-
-    public void setConfiguration(StreamsConfiguration configuration) {
-        this.configuration = configuration;
-    }
-
-    public String getRunmode() {
-        return runmode;
-    }
-
-    public void setRunmode(String runmode) {
-        this.runmode = runmode;
-    }
-
-    public Config getStormConfig() {
-        return stormConfig;
-    }
-
-    public void setStormConfig(Config stormConfig) {
-        this.stormConfig = stormConfig;
-    }
-}