You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2015/08/25 18:31:32 UTC

[2/4] incubator-streams git commit: resolves STREAMS-348

resolves STREAMS-348


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/16790314
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/16790314
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/16790314

Branch: refs/heads/master
Commit: 16790314416842c2e0cbeaf2a0a579a7a016b4ec
Parents: 6683683
Author: Steve Blackmon (@steveblackmon) <sb...@apache.org>
Authored: Sun Jul 26 13:28:48 2015 -0500
Committer: Steve Blackmon (@steveblackmon) <sb...@apache.org>
Committed: Sun Jul 26 13:28:48 2015 -0500

----------------------------------------------------------------------
 .../apache/streams/config/StreamsConfigurator.java  | 16 ++++++++++++++++
 streams-core/pom.xml                                |  5 +++++
 .../java/org/apache/streams/core/StreamBuilder.java |  7 ++++++-
 .../streams/local/builders/LocalStreamBuilder.java  | 12 ++++++++++++
 .../streams/local/builders/StreamComponent.java     |  3 ++-
 .../org/apache/streams/pig/StreamsPigBuilder.java   | 12 ++++++++++++
 6 files changed, 53 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/16790314/streams-config/src/main/java/org/apache/streams/config/StreamsConfigurator.java
----------------------------------------------------------------------
diff --git a/streams-config/src/main/java/org/apache/streams/config/StreamsConfigurator.java b/streams-config/src/main/java/org/apache/streams/config/StreamsConfigurator.java
index 9da0260..95b0b04 100644
--- a/streams-config/src/main/java/org/apache/streams/config/StreamsConfigurator.java
+++ b/streams-config/src/main/java/org/apache/streams/config/StreamsConfigurator.java
@@ -66,4 +66,20 @@ public class StreamsConfigurator {
 
         return pojoConfig;
     }
+
+    public static StreamsConfiguration mergeConfigurations(Config base, Config delta) {
+
+        Config merged = delta.withFallback(base);
+
+        StreamsConfiguration pojoConfig = null;
+
+        try {
+            pojoConfig = mapper.readValue(merged.root().render(ConfigRenderOptions.concise()), StreamsConfiguration.class);
+        } catch (Exception e) {
+            e.printStackTrace();
+            LOGGER.warn("Failed to merge.");
+        }
+
+        return pojoConfig;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/16790314/streams-core/pom.xml
----------------------------------------------------------------------
diff --git a/streams-core/pom.xml b/streams-core/pom.xml
index a478895..a57db76 100644
--- a/streams-core/pom.xml
+++ b/streams-core/pom.xml
@@ -35,6 +35,11 @@
     <dependencies>
         <dependency>
             <groupId>org.apache.streams</groupId>
+            <artifactId>streams-config</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
             <artifactId>streams-util</artifactId>
             <version>${project.version}</version>
         </dependency>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/16790314/streams-core/src/main/java/org/apache/streams/core/StreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams-core/src/main/java/org/apache/streams/core/StreamBuilder.java b/streams-core/src/main/java/org/apache/streams/core/StreamBuilder.java
index d942a94..39bc937 100644
--- a/streams-core/src/main/java/org/apache/streams/core/StreamBuilder.java
+++ b/streams-core/src/main/java/org/apache/streams/core/StreamBuilder.java
@@ -18,8 +18,10 @@
 
 package org.apache.streams.core;
 
+import org.apache.streams.config.StreamsConfiguration;
 import org.joda.time.DateTime;
 
+import java.io.Serializable;
 import java.math.BigInteger;
 
 /**
@@ -35,8 +37,11 @@ import java.math.BigInteger;
  * </pre>
  *
  */
-public interface StreamBuilder {
+public interface StreamBuilder extends Serializable {
 
+    public StreamBuilder setStreamsConfiguration(StreamsConfiguration configuration);
+
+    public StreamsConfiguration getStreamsConfiguration();
 
     /**
      * Add a {@link org.apache.streams.core.StreamsProcessor} to the data processing stream.

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/16790314/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
index 4cf078e..f31c7ed 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
@@ -24,6 +24,7 @@ import org.apache.streams.config.ComponentConfigurator;
 import org.apache.streams.config.StreamsConfiguration;
 import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.core.*;
+import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.local.LocalRuntimeConfiguration;
 import org.apache.streams.local.counters.StreamsTaskCounter;
 import org.apache.streams.local.executors.ShutdownStreamOnUnhandleThrowableThreadPoolExecutor;
@@ -199,6 +200,17 @@ public class LocalStreamBuilder implements StreamBuilder {
     }
 
     @Override
+    public StreamBuilder setStreamsConfiguration(StreamsConfiguration configuration) {
+        streamConfig = StreamsJacksonMapper.getInstance().convertValue(configuration, LocalRuntimeConfiguration.class);
+        return this;
+    }
+
+    @Override
+    public StreamsConfiguration getStreamsConfiguration() {
+        return StreamsJacksonMapper.getInstance().convertValue(streamConfig, StreamsConfiguration.class);
+    }
+
+    @Override
     public StreamBuilder addStreamsProcessor(String id, StreamsProcessor processor, int numTasks, String... inBoundIds) {
         validateId(id);
         StreamComponent comp = new StreamComponent(id, processor, new ThroughputQueue<StreamsDatum>(this.maxQueueCapacity, id, streamIdentifier, startedAt.getMillis()), numTasks, streamConfig);

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/16790314/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java
index 4f2a507..9d602cd 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java
@@ -28,6 +28,7 @@ import org.apache.streams.local.tasks.StreamsTask;
 import org.apache.streams.util.SerializationUtil;
 import org.joda.time.DateTime;
 
+import java.io.Serializable;
 import java.math.BigInteger;
 import java.util.*;
 import java.util.concurrent.BlockingQueue;
@@ -36,7 +37,7 @@ import java.util.concurrent.BlockingQueue;
  * Stores the implementations of {@link org.apache.streams.core.StreamsOperation}, the StreamsOperations it is connected
  * to and the necessary metadata to construct a data stream.
  */
-public class StreamComponent {
+public class StreamComponent implements Serializable {
 
     private static final int START = 1;
     private static final int END = 2;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/16790314/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsPigBuilder.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsPigBuilder.java b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsPigBuilder.java
index c038039..5ff4145 100644
--- a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsPigBuilder.java
+++ b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsPigBuilder.java
@@ -19,6 +19,7 @@
 
 package org.apache.streams.pig;
 
+import org.apache.streams.config.StreamsConfiguration;
 import org.apache.streams.core.StreamBuilder;
 import org.apache.streams.core.StreamsPersistWriter;
 import org.apache.streams.core.StreamsProcessor;
@@ -34,6 +35,17 @@ import java.math.BigInteger;
  * Currently implementers must write own pig scripts to use this module
  */
 public class StreamsPigBuilder implements StreamBuilder {
+
+    @Override
+    public StreamBuilder setStreamsConfiguration(StreamsConfiguration configuration) {
+        return null;
+    }
+
+    @Override
+    public StreamsConfiguration getStreamsConfiguration() {
+        return null;
+    }
+
     @Override
     public StreamBuilder addStreamsProcessor(String s, StreamsProcessor streamsProcessor, int i, String... strings) {
         return null;