You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/02/27 02:28:49 UTC

svn commit: r1572379 - in /incubator/streams/branches/STREAMS-26: streams-contrib/streams-persist-elasticsearch/ streams-contrib/streams-persist-hdfs/ streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/ streams-contrib/streams-p...

Author: sblackmon
Date: Thu Feb 27 01:28:48 2014
New Revision: 1572379

URL: http://svn.apache.org/r1572379
Log:
applied ryan's patch for type-aware deepcopy
avoiding needing to serialize member fields of writers

Modified:
    incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/pom.xml
    incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-hdfs/pom.xml
    incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
    incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
    incubator/streams/branches/STREAMS-26/streams-core/pom.xml
    incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/builders/StreamComponent.java
    incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/tasks/BaseStreamsTask.java

Modified: incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/pom.xml
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/pom.xml?rev=1572379&r1=1572378&r2=1572379&view=diff
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/pom.xml (original)
+++ incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/pom.xml Thu Feb 27 01:28:48 2014
@@ -74,7 +74,7 @@
                     <generateBuilders>true</generateBuilders>
                     <sourcePaths>
                         <sourcePath>src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchConfiguration.json</sourcePath>
-                        <sourcePath>src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json</sourcePath>
+                        <sourcePath>src/main/jsonschema/org/apache/streams/elasticsearch/HdfsWriterConfiguration.json</sourcePath>
                     </sourcePaths>
                     <outputDirectory>target/generated-sources/jsonschema2pojo</outputDirectory>
                     <targetPackage>org.apache.streams.elasticsearch.pojo</targetPackage>

Modified: incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-hdfs/pom.xml
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-hdfs/pom.xml?rev=1572379&r1=1572378&r2=1572379&view=diff
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-hdfs/pom.xml (original)
+++ incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-hdfs/pom.xml Thu Feb 27 01:28:48 2014
@@ -83,6 +83,7 @@
                     <generateBuilders>true</generateBuilders>
                     <sourcePaths>
                         <sourcePath>src/main/jsonschema/org/apache/streams/hdfs/HdfsConfiguration.json</sourcePath>
+                        <sourcePath>src/main/jsonschema/org/apache/streams/hdfs/HdfsWriterConfiguration.json</sourcePath>
                     </sourcePaths>
                     <outputDirectory>target/generated-sources/jsonschema2pojo</outputDirectory>
                     <targetPackage>org.apache.streams.hdfs.pojo</targetPackage>

Modified: incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java?rev=1572379&r1=1572378&r2=1572379&view=diff
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java (original)
+++ incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java Thu Feb 27 01:28:48 2014
@@ -28,7 +28,7 @@ import java.security.PrivilegedException
 import java.util.*;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
-public class WebHdfsPersistWriter implements StreamsPersistWriter, Runnable, Flushable, Closeable
+public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable, Closeable
 {
     private final static Logger LOGGER = LoggerFactory.getLogger(WebHdfsPersistWriter.class);
 
@@ -44,63 +44,24 @@ public class WebHdfsPersistWriter implem
     private int fileLineCounter = 0;
     private OutputStreamWriter currentWriter = null;
 
+    private static final int  BYTES_IN_MB = 1024*1024;
+    private static final int  BYTES_BEFORE_FLUSH = 5 * BYTES_IN_MB;
+    private volatile int  totalByteCount = 0;
+    private volatile int  byteCount = 0;
+
     public boolean terminate = false;
 
     protected volatile Queue<StreamsDatum> persistQueue;
 
     private ObjectMapper mapper = new ObjectMapper();
 
-    private HdfsConfiguration config;
-
-    public WebHdfsPersistWriter() {
-        Config config = StreamsConfigurator.config.getConfig("hdfs");
-        this.config = HdfsConfigurator.detectConfiguration(config);
-        this.persistQueue  = new ConcurrentLinkedQueue<StreamsDatum>();
-    }
-
-    public WebHdfsPersistWriter(Queue<StreamsDatum> persistQueue) {
-        Config config = StreamsConfigurator.config.getConfig("hdfs");
-        this.config = HdfsConfigurator.detectConfiguration(config);
-        this.persistQueue = persistQueue;
-    }
-
-    public WebHdfsPersistWriter(HdfsConfiguration config) {
-        this.config = config;
-        this.persistQueue = new ConcurrentLinkedQueue<StreamsDatum>();
-    }
-
-    public WebHdfsPersistWriter(HdfsConfiguration config, Queue<StreamsDatum> persistQueue) {
-        this.config = config;
-        this.persistQueue = persistQueue;
-    }
-
-    public WebHdfsPersistWriter(HdfsConfiguration config, Queue<StreamsDatum> persistQueue, Path path) {
-        this.config = config;
-        this.persistQueue = persistQueue;
-        this.path = path;
-    }
-
-    public WebHdfsPersistWriter(HdfsConfiguration config, Queue<StreamsDatum> persistQueue, Path path, String filePart) {
-        this.config = config;
-        this.persistQueue = persistQueue;
-        this.path = path;
-        this.filePart = filePart;
-    }
+    private HdfsConfiguration hdfsConfiguration;
 
-    public WebHdfsPersistWriter(HdfsConfiguration config, Queue<StreamsDatum> persistQueue, Path path, String filePart, int linesPerFile) {
-        this.config = config;
-        this.persistQueue = persistQueue;
-        this.path = path;
-        this.filePart = filePart;
-        this.linesPerFile = linesPerFile;
+    public WebHdfsPersistWriter(HdfsConfiguration hdfsConfiguration) {
+        this.hdfsConfiguration = hdfsConfiguration;
     }
 
-    private static final int  BYTES_IN_MB = 1024*1024;
-    private static final int  BYTES_BEFORE_FLUSH = 5 * BYTES_IN_MB;
-    private volatile int  totalByteCount = 0;
-    private volatile int  byteCount = 0;
-
-    public URI getURI() throws URISyntaxException { return new URI(WebHdfsFileSystem.SCHEME + "://" + config.getHost() + ":" + config.getPort()); }
+    public URI getURI() throws URISyntaxException { return new URI(WebHdfsFileSystem.SCHEME + "://" + hdfsConfiguration.getHost() + ":" + hdfsConfiguration.getPort()); }
     public boolean isConnected() 		                { return (client != null); }
 
     public final synchronized FileSystem getFileSystem()
@@ -115,8 +76,8 @@ public class WebHdfsPersistWriter implem
     {
         try
         {
-            LOGGER.info("User : {}", this.config.getUser());
-            UserGroupInformation ugi = UserGroupInformation.createRemoteUser(this.config.getUser());
+            LOGGER.info("User : {}", this.hdfsConfiguration.getUser());
+            UserGroupInformation ugi = UserGroupInformation.createRemoteUser(this.hdfsConfiguration.getUser());
             ugi.setAuthenticationMethod(UserGroupInformation.AuthenticationMethod.SIMPLE);
 
             ugi.doAs(new PrivilegedExceptionAction<Void>() {
@@ -282,55 +243,11 @@ public class WebHdfsPersistWriter implem
                     .toString();
     }
 
-    public void start() {
-
-        connectToWebHDFS();
-
-    }
-
-    public void stop() {
-
-        try {
-            flush();
-        } catch (IOException e) {
-            e.printStackTrace();
-        }
-        try {
-            close();
-        } catch (IOException e) {
-            e.printStackTrace();
-        }
-    }
-
-//    public void setPersistQueue(Queue<StreamsDatum> persistQueue) {
-//        this.persistQueue = persistQueue;
-//    }
-//
-//    public Queue<StreamsDatum> getPersistQueue() {
-//        return persistQueue;
-//    }
-
-
-    @Override
-    public void run() {
-
-        start();
-
-        Thread task = new Thread(new WebHdfsPersistWriterTask(this));
-        task.start();
-
-        while( !terminate ) {
-            try {
-                Thread.sleep(new Random().nextInt(100));
-            } catch (InterruptedException e) { }
-        }
-
-        stop();
-    }
-
     @Override
     public void prepare(Object configurationObject) {
+        this.hdfsConfiguration = (HdfsWriterConfiguration)configurationObject;
         connectToWebHDFS();
+        path = new Path(hdfsConfiguration.getPath());
     }
 
     @Override

Modified: incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java?rev=1572379&r1=1572378&r2=1572379&view=diff
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java (original)
+++ incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java Thu Feb 27 01:28:48 2014
@@ -33,7 +33,7 @@ import java.util.concurrent.*;
  */
 public class TwitterTimelineProvider implements StreamsProvider, Serializable {
 
-    private final static String STREAMS_ID = "TwitterTimelineProvider";
+    public final static String STREAMS_ID = "TwitterTimelineProvider";
 
     private final static Logger LOGGER = LoggerFactory.getLogger(TwitterTimelineProvider.class);
 

Modified: incubator/streams/branches/STREAMS-26/streams-core/pom.xml
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-core/pom.xml?rev=1572379&r1=1572378&r2=1572379&view=diff
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-core/pom.xml (original)
+++ incubator/streams/branches/STREAMS-26/streams-core/pom.xml Thu Feb 27 01:28:48 2014
@@ -39,6 +39,24 @@
             <artifactId>streams-util</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-pojo</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.datatype</groupId>
+            <artifactId>jackson-datatype-json-org</artifactId>
+            <version>${jackson.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-pojo</artifactId>
+            <version>${project.version}</version>
+        </dependency>
     </dependencies>
 
     <build>

Modified: incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/builders/StreamComponent.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/builders/StreamComponent.java?rev=1572379&r1=1572378&r2=1572379&view=diff
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/builders/StreamComponent.java (original)
+++ incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/builders/StreamComponent.java Thu Feb 27 01:28:48 2014
@@ -173,8 +173,13 @@ public class StreamComponent {
             }
         }
         else if(this.writer != null) {
-            task = new StreamsPersistWriterTask((StreamsPersistWriter) SerializationUtil.cloneBySerialization(this.writer));
-            task.addInputQueue(this.inQueue);
+            if(this.numTasks > 1) {
+                task = new StreamsPersistWriterTask((StreamsPersistWriter) SerializationUtil.cloneBySerialization(this.writer));
+                task.addInputQueue(this.inQueue);
+            } else {
+                task = new StreamsPersistWriterTask(this.writer);
+                task.addInputQueue(this.inQueue);
+            }
         }
         else if(this.provider != null) {
             StreamsProvider prov = (StreamsProvider)SerializationUtil.cloneBySerialization(this.provider);

Modified: incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/tasks/BaseStreamsTask.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/tasks/BaseStreamsTask.java?rev=1572379&r1=1572378&r2=1572379&view=diff
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/tasks/BaseStreamsTask.java (original)
+++ incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/tasks/BaseStreamsTask.java Thu Feb 27 01:28:48 2014
@@ -1,10 +1,14 @@
 package org.apache.streams.core.tasks;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.streams.pojo.json.Activity;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.util.SerializationUtil;
+import com.fasterxml.jackson.databind.node.ObjectNode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
@@ -20,6 +24,13 @@ public abstract class BaseStreamsTask im
     private List<Queue<StreamsDatum>> inQueues = new ArrayList<Queue<StreamsDatum>>();
     private List<Queue<StreamsDatum>> outQueues = new LinkedList<Queue<StreamsDatum>>();
     private int inIndex = 0;
+    private ObjectMapper mapper;
+
+    public BaseStreamsTask() {
+        this.mapper = new ObjectMapper();
+        this.mapper.registerSubtypes(Activity.class);
+    }
+
 
     @Override
     public void addInputQueue(Queue<StreamsDatum> inputQueue) {
@@ -68,9 +79,12 @@ public abstract class BaseStreamsTask im
             this.outQueues.get(0).offer(datum);
         }
         else {
+            StreamsDatum newDatum = null;
             for(Queue<StreamsDatum> queue : this.outQueues) {
                 try {
-                    queue.offer((StreamsDatum) SerializationUtil.deserialize(SerializationUtil.serialize(datum)));
+                    newDatum = cloneStreamsDatum(datum);
+                    if(newDatum != null)
+                        queue.offer(newDatum);
                 } catch (RuntimeException e) {
                     LOGGER.debug("Failed to add StreamsDatum to outgoing queue : {}", datum);
                     LOGGER.error("Exception while offering StreamsDatum to outgoing queue: {}", e);
@@ -79,6 +93,49 @@ public abstract class BaseStreamsTask im
         }
     }
 
+    /**
+     * //TODO LOCAL MODE HACK. Need to fix
+     * In order for our data streams to ported to other data flow frame works(Storm, Hadoop, Spark, etc) we need to be able to
+     * enforce the serialization required by each framework.  This needs some thought and design before a final solution is
+     * made.
+     *
+     * In order to be able to copy/clone StreamDatums the orginal idea was to force all StreamsDatums to be java serializable.
+     * This was seen as unacceptable for local mode.  So until we come up with a solution to enforce serialization and be
+     * compatiable across multiple frame works, this hack is in place.
+     *
+     * If datum.document is Serializable, we use serialization to clone a new copy.  If it is not Serializable we attempt
+     * different methods using an com.fasterxml.jackson.databind.ObjectMapper to copy/clone the StreamsDatum. If the object
+     * is not clonable by these methods, an error is reported to the logging and a NULL object is returned.
+     *
+     * @param datum
+     * @return
+     */
+    protected StreamsDatum cloneStreamsDatum(StreamsDatum datum) {
+        try {
+            if(datum.document instanceof Serializable) {
+                return (StreamsDatum) SerializationUtil.cloneBySerialization(datum);
+            }
+            else if(datum.document instanceof ObjectNode) {
+                return new StreamsDatum(((ObjectNode) datum.document).deepCopy(), datum.timestamp, datum.sequenceid);
+            }
+            else if(datum.document instanceof Activity) {
+
+                return new StreamsDatum(this.mapper.readValue(this.mapper.writeValueAsString(datum.document), Activity.class),
+                                        datum.timestamp,
+                                        datum.sequenceid);
+            }
+            else if(this.mapper.canSerialize(datum.document.getClass())){
+                return new StreamsDatum(this.mapper.readValue(this.mapper.writeValueAsString(datum.document), datum.document.getClass()),
+                                        datum.timestamp,
+                                        datum.sequenceid);
+            }
+        } catch (Exception e) {
+            LOGGER.error("Exception while trying to clone/copy StreamsDatum : {}", e);
+        }
+        LOGGER.error("Failed to clone/copy StreamsDatum with document of class : {}", datum.document.getClass().getName());
+        return null;
+    }
+
     private int getNextInputQueueIndex() {
         ++this.inIndex;
         if(this.inIndex >= this.inQueues.size()) {