You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/02/27 02:28:49 UTC
svn commit: r1572379 - in /incubator/streams/branches/STREAMS-26:
streams-contrib/streams-persist-elasticsearch/
streams-contrib/streams-persist-hdfs/
streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/
streams-contrib/streams-p...
Author: sblackmon
Date: Thu Feb 27 01:28:48 2014
New Revision: 1572379
URL: http://svn.apache.org/r1572379
Log:
applied ryan's patch for type-aware deepcopy
avoiding needing to serialize member fields of writers
Modified:
incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/pom.xml
incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-hdfs/pom.xml
incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
incubator/streams/branches/STREAMS-26/streams-core/pom.xml
incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/builders/StreamComponent.java
incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/tasks/BaseStreamsTask.java
Modified: incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/pom.xml
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/pom.xml?rev=1572379&r1=1572378&r2=1572379&view=diff
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/pom.xml (original)
+++ incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/pom.xml Thu Feb 27 01:28:48 2014
@@ -74,7 +74,7 @@
<generateBuilders>true</generateBuilders>
<sourcePaths>
<sourcePath>src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchConfiguration.json</sourcePath>
- <sourcePath>src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json</sourcePath>
+ <sourcePath>src/main/jsonschema/org/apache/streams/elasticsearch/HdfsWriterConfiguration.json</sourcePath>
</sourcePaths>
<outputDirectory>target/generated-sources/jsonschema2pojo</outputDirectory>
<targetPackage>org.apache.streams.elasticsearch.pojo</targetPackage>
Modified: incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-hdfs/pom.xml
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-hdfs/pom.xml?rev=1572379&r1=1572378&r2=1572379&view=diff
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-hdfs/pom.xml (original)
+++ incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-hdfs/pom.xml Thu Feb 27 01:28:48 2014
@@ -83,6 +83,7 @@
<generateBuilders>true</generateBuilders>
<sourcePaths>
<sourcePath>src/main/jsonschema/org/apache/streams/hdfs/HdfsConfiguration.json</sourcePath>
+ <sourcePath>src/main/jsonschema/org/apache/streams/hdfs/HdfsWriterConfiguration.json</sourcePath>
</sourcePaths>
<outputDirectory>target/generated-sources/jsonschema2pojo</outputDirectory>
<targetPackage>org.apache.streams.hdfs.pojo</targetPackage>
Modified: incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java?rev=1572379&r1=1572378&r2=1572379&view=diff
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java (original)
+++ incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java Thu Feb 27 01:28:48 2014
@@ -28,7 +28,7 @@ import java.security.PrivilegedException
import java.util.*;
import java.util.concurrent.ConcurrentLinkedQueue;
-public class WebHdfsPersistWriter implements StreamsPersistWriter, Runnable, Flushable, Closeable
+public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable, Closeable
{
private final static Logger LOGGER = LoggerFactory.getLogger(WebHdfsPersistWriter.class);
@@ -44,63 +44,24 @@ public class WebHdfsPersistWriter implem
private int fileLineCounter = 0;
private OutputStreamWriter currentWriter = null;
+ private static final int BYTES_IN_MB = 1024*1024;
+ private static final int BYTES_BEFORE_FLUSH = 5 * BYTES_IN_MB;
+ private volatile int totalByteCount = 0;
+ private volatile int byteCount = 0;
+
public boolean terminate = false;
protected volatile Queue<StreamsDatum> persistQueue;
private ObjectMapper mapper = new ObjectMapper();
- private HdfsConfiguration config;
-
- public WebHdfsPersistWriter() {
- Config config = StreamsConfigurator.config.getConfig("hdfs");
- this.config = HdfsConfigurator.detectConfiguration(config);
- this.persistQueue = new ConcurrentLinkedQueue<StreamsDatum>();
- }
-
- public WebHdfsPersistWriter(Queue<StreamsDatum> persistQueue) {
- Config config = StreamsConfigurator.config.getConfig("hdfs");
- this.config = HdfsConfigurator.detectConfiguration(config);
- this.persistQueue = persistQueue;
- }
-
- public WebHdfsPersistWriter(HdfsConfiguration config) {
- this.config = config;
- this.persistQueue = new ConcurrentLinkedQueue<StreamsDatum>();
- }
-
- public WebHdfsPersistWriter(HdfsConfiguration config, Queue<StreamsDatum> persistQueue) {
- this.config = config;
- this.persistQueue = persistQueue;
- }
-
- public WebHdfsPersistWriter(HdfsConfiguration config, Queue<StreamsDatum> persistQueue, Path path) {
- this.config = config;
- this.persistQueue = persistQueue;
- this.path = path;
- }
-
- public WebHdfsPersistWriter(HdfsConfiguration config, Queue<StreamsDatum> persistQueue, Path path, String filePart) {
- this.config = config;
- this.persistQueue = persistQueue;
- this.path = path;
- this.filePart = filePart;
- }
+ private HdfsConfiguration hdfsConfiguration;
- public WebHdfsPersistWriter(HdfsConfiguration config, Queue<StreamsDatum> persistQueue, Path path, String filePart, int linesPerFile) {
- this.config = config;
- this.persistQueue = persistQueue;
- this.path = path;
- this.filePart = filePart;
- this.linesPerFile = linesPerFile;
+ public WebHdfsPersistWriter(HdfsConfiguration hdfsConfiguration) {
+ this.hdfsConfiguration = hdfsConfiguration;
}
- private static final int BYTES_IN_MB = 1024*1024;
- private static final int BYTES_BEFORE_FLUSH = 5 * BYTES_IN_MB;
- private volatile int totalByteCount = 0;
- private volatile int byteCount = 0;
-
- public URI getURI() throws URISyntaxException { return new URI(WebHdfsFileSystem.SCHEME + "://" + config.getHost() + ":" + config.getPort()); }
+ public URI getURI() throws URISyntaxException { return new URI(WebHdfsFileSystem.SCHEME + "://" + hdfsConfiguration.getHost() + ":" + hdfsConfiguration.getPort()); }
public boolean isConnected() { return (client != null); }
public final synchronized FileSystem getFileSystem()
@@ -115,8 +76,8 @@ public class WebHdfsPersistWriter implem
{
try
{
- LOGGER.info("User : {}", this.config.getUser());
- UserGroupInformation ugi = UserGroupInformation.createRemoteUser(this.config.getUser());
+ LOGGER.info("User : {}", this.hdfsConfiguration.getUser());
+ UserGroupInformation ugi = UserGroupInformation.createRemoteUser(this.hdfsConfiguration.getUser());
ugi.setAuthenticationMethod(UserGroupInformation.AuthenticationMethod.SIMPLE);
ugi.doAs(new PrivilegedExceptionAction<Void>() {
@@ -282,55 +243,11 @@ public class WebHdfsPersistWriter implem
.toString();
}
- public void start() {
-
- connectToWebHDFS();
-
- }
-
- public void stop() {
-
- try {
- flush();
- } catch (IOException e) {
- e.printStackTrace();
- }
- try {
- close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
-
-// public void setPersistQueue(Queue<StreamsDatum> persistQueue) {
-// this.persistQueue = persistQueue;
-// }
-//
-// public Queue<StreamsDatum> getPersistQueue() {
-// return persistQueue;
-// }
-
-
- @Override
- public void run() {
-
- start();
-
- Thread task = new Thread(new WebHdfsPersistWriterTask(this));
- task.start();
-
- while( !terminate ) {
- try {
- Thread.sleep(new Random().nextInt(100));
- } catch (InterruptedException e) { }
- }
-
- stop();
- }
-
@Override
public void prepare(Object configurationObject) {
+ this.hdfsConfiguration = (HdfsWriterConfiguration)configurationObject;
connectToWebHDFS();
+ path = new Path(hdfsConfiguration.getPath());
}
@Override
Modified: incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java?rev=1572379&r1=1572378&r2=1572379&view=diff
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java (original)
+++ incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java Thu Feb 27 01:28:48 2014
@@ -33,7 +33,7 @@ import java.util.concurrent.*;
*/
public class TwitterTimelineProvider implements StreamsProvider, Serializable {
- private final static String STREAMS_ID = "TwitterTimelineProvider";
+ public final static String STREAMS_ID = "TwitterTimelineProvider";
private final static Logger LOGGER = LoggerFactory.getLogger(TwitterTimelineProvider.class);
Modified: incubator/streams/branches/STREAMS-26/streams-core/pom.xml
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-core/pom.xml?rev=1572379&r1=1572378&r2=1572379&view=diff
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-core/pom.xml (original)
+++ incubator/streams/branches/STREAMS-26/streams-core/pom.xml Thu Feb 27 01:28:48 2014
@@ -39,6 +39,24 @@
<artifactId>streams-util</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-pojo</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.datatype</groupId>
+ <artifactId>jackson-datatype-json-org</artifactId>
+ <version>${jackson.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-pojo</artifactId>
+ <version>${project.version}</version>
+ </dependency>
</dependencies>
<build>
Modified: incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/builders/StreamComponent.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/builders/StreamComponent.java?rev=1572379&r1=1572378&r2=1572379&view=diff
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/builders/StreamComponent.java (original)
+++ incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/builders/StreamComponent.java Thu Feb 27 01:28:48 2014
@@ -173,8 +173,13 @@ public class StreamComponent {
}
}
else if(this.writer != null) {
- task = new StreamsPersistWriterTask((StreamsPersistWriter) SerializationUtil.cloneBySerialization(this.writer));
- task.addInputQueue(this.inQueue);
+ if(this.numTasks > 1) {
+ task = new StreamsPersistWriterTask((StreamsPersistWriter) SerializationUtil.cloneBySerialization(this.writer));
+ task.addInputQueue(this.inQueue);
+ } else {
+ task = new StreamsPersistWriterTask(this.writer);
+ task.addInputQueue(this.inQueue);
+ }
}
else if(this.provider != null) {
StreamsProvider prov = (StreamsProvider)SerializationUtil.cloneBySerialization(this.provider);
Modified: incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/tasks/BaseStreamsTask.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/tasks/BaseStreamsTask.java?rev=1572379&r1=1572378&r2=1572379&view=diff
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/tasks/BaseStreamsTask.java (original)
+++ incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/tasks/BaseStreamsTask.java Thu Feb 27 01:28:48 2014
@@ -1,10 +1,14 @@
package org.apache.streams.core.tasks;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.streams.pojo.json.Activity;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.util.SerializationUtil;
+import com.fasterxml.jackson.databind.node.ObjectNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.Serializable;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
@@ -20,6 +24,13 @@ public abstract class BaseStreamsTask im
private List<Queue<StreamsDatum>> inQueues = new ArrayList<Queue<StreamsDatum>>();
private List<Queue<StreamsDatum>> outQueues = new LinkedList<Queue<StreamsDatum>>();
private int inIndex = 0;
+ private ObjectMapper mapper;
+
+ public BaseStreamsTask() {
+ this.mapper = new ObjectMapper();
+ this.mapper.registerSubtypes(Activity.class);
+ }
+
@Override
public void addInputQueue(Queue<StreamsDatum> inputQueue) {
@@ -68,9 +79,12 @@ public abstract class BaseStreamsTask im
this.outQueues.get(0).offer(datum);
}
else {
+ StreamsDatum newDatum = null;
for(Queue<StreamsDatum> queue : this.outQueues) {
try {
- queue.offer((StreamsDatum) SerializationUtil.deserialize(SerializationUtil.serialize(datum)));
+ newDatum = cloneStreamsDatum(datum);
+ if(newDatum != null)
+ queue.offer(newDatum);
} catch (RuntimeException e) {
LOGGER.debug("Failed to add StreamsDatum to outgoing queue : {}", datum);
LOGGER.error("Exception while offering StreamsDatum to outgoing queue: {}", e);
@@ -79,6 +93,49 @@ public abstract class BaseStreamsTask im
}
}
+ /**
+ * //TODO LOCAL MODE HACK. Need to fix
+ * In order for our data streams to ported to other data flow frame works(Storm, Hadoop, Spark, etc) we need to be able to
+ * enforce the serialization required by each framework. This needs some thought and design before a final solution is
+ * made.
+ *
+ * In order to be able to copy/clone StreamDatums the orginal idea was to force all StreamsDatums to be java serializable.
+ * This was seen as unacceptable for local mode. So until we come up with a solution to enforce serialization and be
+ * compatiable across multiple frame works, this hack is in place.
+ *
+ * If datum.document is Serializable, we use serialization to clone a new copy. If it is not Serializable we attempt
+ * different methods using an com.fasterxml.jackson.databind.ObjectMapper to copy/clone the StreamsDatum. If the object
+ * is not clonable by these methods, an error is reported to the logging and a NULL object is returned.
+ *
+ * @param datum
+ * @return
+ */
+ protected StreamsDatum cloneStreamsDatum(StreamsDatum datum) {
+ try {
+ if(datum.document instanceof Serializable) {
+ return (StreamsDatum) SerializationUtil.cloneBySerialization(datum);
+ }
+ else if(datum.document instanceof ObjectNode) {
+ return new StreamsDatum(((ObjectNode) datum.document).deepCopy(), datum.timestamp, datum.sequenceid);
+ }
+ else if(datum.document instanceof Activity) {
+
+ return new StreamsDatum(this.mapper.readValue(this.mapper.writeValueAsString(datum.document), Activity.class),
+ datum.timestamp,
+ datum.sequenceid);
+ }
+ else if(this.mapper.canSerialize(datum.document.getClass())){
+ return new StreamsDatum(this.mapper.readValue(this.mapper.writeValueAsString(datum.document), datum.document.getClass()),
+ datum.timestamp,
+ datum.sequenceid);
+ }
+ } catch (Exception e) {
+ LOGGER.error("Exception while trying to clone/copy StreamsDatum : {}", e);
+ }
+ LOGGER.error("Failed to clone/copy StreamsDatum with document of class : {}", datum.document.getClass().getName());
+ return null;
+ }
+
private int getNextInputQueueIndex() {
++this.inIndex;
if(this.inIndex >= this.inQueues.size()) {