You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2015/08/25 18:31:32 UTC
[2/4] incubator-streams git commit: resolves STREAMS-348
resolves STREAMS-348
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/16790314
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/16790314
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/16790314
Branch: refs/heads/master
Commit: 16790314416842c2e0cbeaf2a0a579a7a016b4ec
Parents: 6683683
Author: Steve Blackmon (@steveblackmon) <sb...@apache.org>
Authored: Sun Jul 26 13:28:48 2015 -0500
Committer: Steve Blackmon (@steveblackmon) <sb...@apache.org>
Committed: Sun Jul 26 13:28:48 2015 -0500
----------------------------------------------------------------------
.../apache/streams/config/StreamsConfigurator.java | 16 ++++++++++++++++
streams-core/pom.xml | 5 +++++
.../java/org/apache/streams/core/StreamBuilder.java | 7 ++++++-
.../streams/local/builders/LocalStreamBuilder.java | 12 ++++++++++++
.../streams/local/builders/StreamComponent.java | 3 ++-
.../org/apache/streams/pig/StreamsPigBuilder.java | 12 ++++++++++++
6 files changed, 53 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/16790314/streams-config/src/main/java/org/apache/streams/config/StreamsConfigurator.java
----------------------------------------------------------------------
diff --git a/streams-config/src/main/java/org/apache/streams/config/StreamsConfigurator.java b/streams-config/src/main/java/org/apache/streams/config/StreamsConfigurator.java
index 9da0260..95b0b04 100644
--- a/streams-config/src/main/java/org/apache/streams/config/StreamsConfigurator.java
+++ b/streams-config/src/main/java/org/apache/streams/config/StreamsConfigurator.java
@@ -66,4 +66,20 @@ public class StreamsConfigurator {
return pojoConfig;
}
+
+ public static StreamsConfiguration mergeConfigurations(Config base, Config delta) {
+
+ Config merged = delta.withFallback(base);
+
+ StreamsConfiguration pojoConfig = null;
+
+ try {
+ pojoConfig = mapper.readValue(merged.root().render(ConfigRenderOptions.concise()), StreamsConfiguration.class);
+ } catch (Exception e) {
+ e.printStackTrace();
+ LOGGER.warn("Failed to merge.");
+ }
+
+ return pojoConfig;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/16790314/streams-core/pom.xml
----------------------------------------------------------------------
diff --git a/streams-core/pom.xml b/streams-core/pom.xml
index a478895..a57db76 100644
--- a/streams-core/pom.xml
+++ b/streams-core/pom.xml
@@ -35,6 +35,11 @@
<dependencies>
<dependency>
<groupId>org.apache.streams</groupId>
+ <artifactId>streams-config</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
<artifactId>streams-util</artifactId>
<version>${project.version}</version>
</dependency>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/16790314/streams-core/src/main/java/org/apache/streams/core/StreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams-core/src/main/java/org/apache/streams/core/StreamBuilder.java b/streams-core/src/main/java/org/apache/streams/core/StreamBuilder.java
index d942a94..39bc937 100644
--- a/streams-core/src/main/java/org/apache/streams/core/StreamBuilder.java
+++ b/streams-core/src/main/java/org/apache/streams/core/StreamBuilder.java
@@ -18,8 +18,10 @@
package org.apache.streams.core;
+import org.apache.streams.config.StreamsConfiguration;
import org.joda.time.DateTime;
+import java.io.Serializable;
import java.math.BigInteger;
/**
@@ -35,8 +37,11 @@ import java.math.BigInteger;
* </pre>
*
*/
-public interface StreamBuilder {
+public interface StreamBuilder extends Serializable {
+ public StreamBuilder setStreamsConfiguration(StreamsConfiguration configuration);
+
+ public StreamsConfiguration getStreamsConfiguration();
/**
* Add a {@link org.apache.streams.core.StreamsProcessor} to the data processing stream.
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/16790314/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
index 4cf078e..f31c7ed 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
@@ -24,6 +24,7 @@ import org.apache.streams.config.ComponentConfigurator;
import org.apache.streams.config.StreamsConfiguration;
import org.apache.streams.config.StreamsConfigurator;
import org.apache.streams.core.*;
+import org.apache.streams.jackson.StreamsJacksonMapper;
import org.apache.streams.local.LocalRuntimeConfiguration;
import org.apache.streams.local.counters.StreamsTaskCounter;
import org.apache.streams.local.executors.ShutdownStreamOnUnhandleThrowableThreadPoolExecutor;
@@ -199,6 +200,17 @@ public class LocalStreamBuilder implements StreamBuilder {
}
@Override
+ public StreamBuilder setStreamsConfiguration(StreamsConfiguration configuration) {
+ streamConfig = StreamsJacksonMapper.getInstance().convertValue(configuration, LocalRuntimeConfiguration.class);
+ return this;
+ }
+
+ @Override
+ public StreamsConfiguration getStreamsConfiguration() {
+ return StreamsJacksonMapper.getInstance().convertValue(streamConfig, StreamsConfiguration.class);
+ }
+
+ @Override
public StreamBuilder addStreamsProcessor(String id, StreamsProcessor processor, int numTasks, String... inBoundIds) {
validateId(id);
StreamComponent comp = new StreamComponent(id, processor, new ThroughputQueue<StreamsDatum>(this.maxQueueCapacity, id, streamIdentifier, startedAt.getMillis()), numTasks, streamConfig);
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/16790314/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java
index 4f2a507..9d602cd 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java
@@ -28,6 +28,7 @@ import org.apache.streams.local.tasks.StreamsTask;
import org.apache.streams.util.SerializationUtil;
import org.joda.time.DateTime;
+import java.io.Serializable;
import java.math.BigInteger;
import java.util.*;
import java.util.concurrent.BlockingQueue;
@@ -36,7 +37,7 @@ import java.util.concurrent.BlockingQueue;
* Stores the implementations of {@link org.apache.streams.core.StreamsOperation}, the StreamsOperations it is connected
* to and the necessary metadata to construct a data stream.
*/
-public class StreamComponent {
+public class StreamComponent implements Serializable {
private static final int START = 1;
private static final int END = 2;
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/16790314/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsPigBuilder.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsPigBuilder.java b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsPigBuilder.java
index c038039..5ff4145 100644
--- a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsPigBuilder.java
+++ b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsPigBuilder.java
@@ -19,6 +19,7 @@
package org.apache.streams.pig;
+import org.apache.streams.config.StreamsConfiguration;
import org.apache.streams.core.StreamBuilder;
import org.apache.streams.core.StreamsPersistWriter;
import org.apache.streams.core.StreamsProcessor;
@@ -34,6 +35,17 @@ import java.math.BigInteger;
* Currently implementers must write own pig scripts to use this module
*/
public class StreamsPigBuilder implements StreamBuilder {
+
+ @Override
+ public StreamBuilder setStreamsConfiguration(StreamsConfiguration configuration) {
+ return null;
+ }
+
+ @Override
+ public StreamsConfiguration getStreamsConfiguration() {
+ return null;
+ }
+
@Override
public StreamBuilder addStreamsProcessor(String s, StreamsProcessor streamsProcessor, int i, String... strings) {
return null;