You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/05/26 21:44:17 UTC

[1/2] git commit: Additional tests SLF4J logging More logging Class comments Fixed issue in constructor causing process document to return empty Version bump

Repository: incubator-streams
Updated Branches:
  refs/heads/master cbb60a470 -> e5c95d664


Additional tests
SLF4J logging
More logging
Class comments
Fixed issue in constructor causing process document to return empty
Version bump


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/5688a282
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/5688a282
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/5688a282

Branch: refs/heads/master
Commit: 5688a28272f93d48af4ff7e2848f5db7803880cd
Parents: a4573f2
Author: sblackmon <sb...@w2odigital.com>
Authored: Fri May 23 13:16:00 2014 -0500
Committer: sblackmon <sb...@w2odigital.com>
Committed: Fri May 23 13:16:00 2014 -0500

----------------------------------------------------------------------
 streams-runtimes/streams-runtime-pig/pom.xml    |  4 +-
 .../streams/pig/StreamsComponentFactory.java    |  7 ++-
 .../streams/pig/StreamsProcessDatumExec.java    |  9 ++-
 .../streams/pig/StreamsProcessDocumentExec.java | 15 +++--
 .../streams/pig/StreamsSerializerExec.java      |  8 +--
 .../streams/pig/test/AppendStringProcessor.java | 59 ++++++++++++++++++++
 .../streams/pig/test/CopyThriceProcessor.java   |  7 ++-
 .../streams/pig/test/DoNothingProcessor.java    |  9 ++-
 .../pig/test/PigProcessDocumentTest.java        | 27 +++++++++
 .../resources/pigprocessdocumentappendtest.pig  |  4 ++
 .../test/resources/pigprocessdocumenttest.pig   |  4 +-
 11 files changed, 129 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5688a282/streams-runtimes/streams-runtime-pig/pom.xml
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-pig/pom.xml b/streams-runtimes/streams-runtime-pig/pom.xml
index 14c6994..28a4e16 100644
--- a/streams-runtimes/streams-runtime-pig/pom.xml
+++ b/streams-runtimes/streams-runtime-pig/pom.xml
@@ -29,8 +29,8 @@
     <artifactId>streams-runtime-pig</artifactId>
 
     <properties>
-        <hadoop-client.version>2.0.0-cdh4.5.0.1-SNAPSHOT</hadoop-client.version>
-        <pig.version>0.11.0-cdh4.5.0.1-SNAPSHOT</pig.version>
+        <hadoop-client.version>2.0.0-mr1-cdh4.6.0</hadoop-client.version>
+        <pig.version>0.11.0-cdh4.6.0</pig.version>
     </properties>
 
     <dependencies>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5688a282/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsComponentFactory.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsComponentFactory.java b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsComponentFactory.java
index 1ef831c..e92a5ae 100644
--- a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsComponentFactory.java
+++ b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsComponentFactory.java
@@ -24,6 +24,7 @@ import com.google.common.collect.Lists;
 import org.apache.commons.lang.ArrayUtils;
 import org.apache.streams.core.StreamsProcessor;
 import org.apache.streams.data.ActivitySerializer;
+import org.slf4j.Logger;
 
 import java.lang.reflect.InvocationTargetException;
 import java.util.ArrayList;
@@ -35,13 +36,15 @@ import java.util.List;
  */
 public class StreamsComponentFactory {
 
+    private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(StreamsComponentFactory.class);
+
     public static ActivitySerializer getSerializerInstance(Class<?> serializerClazz) {
 
         Object object = null;
         try {
             object = serializerClazz.getConstructor().newInstance();
         } catch (Exception e) {
-            e.printStackTrace();
+            LOGGER.error(e.getMessage());
         }
 
         Preconditions.checkNotNull(object);
@@ -58,7 +61,7 @@ public class StreamsComponentFactory {
         try {
             object = processorClazz.getConstructor().newInstance();
         } catch (Exception e) {
-            e.printStackTrace();
+            LOGGER.error(e.getMessage());
         }
         StreamsProcessor processor = (StreamsProcessor) object;
         return processor;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5688a282/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDatumExec.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDatumExec.java b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDatumExec.java
index 580712e..c19e097 100644
--- a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDatumExec.java
+++ b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDatumExec.java
@@ -33,6 +33,7 @@ import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProcessor;
 import org.apache.streams.data.util.RFC3339Utils;
 import org.joda.time.DateTime;
+import org.slf4j.Logger;
 
 import java.io.IOException;
 import java.util.Arrays;
@@ -45,6 +46,8 @@ import java.util.concurrent.TimeUnit;
 @MonitoredUDF(timeUnit = TimeUnit.SECONDS, duration = 30, intDefault = 10)
 public class StreamsProcessDatumExec extends AliasableEvalFunc<DataBag> {
 
+    private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(StreamsProcessDatumExec.class);
+
     TupleFactory mTupleFactory = TupleFactory.getInstance();
     BagFactory mBagFactory = BagFactory.getInstance();
 
@@ -55,13 +58,13 @@ public class StreamsProcessDatumExec extends AliasableEvalFunc<DataBag> {
         Preconditions.checkArgument(execArgs.length > 0);
         String classFullName = execArgs[0];
         Preconditions.checkNotNull(classFullName);
-        String[] prepareArgs = new String[execArgs.length-1];
-        ArrayUtils.remove(execArgs, 0);
-        ArrayUtils.addAll(prepareArgs, execArgs);
+        String[] prepareArgs = (String[]) ArrayUtils.remove(execArgs, 0);
         streamsProcessor = StreamsComponentFactory.getProcessorInstance(Class.forName(classFullName));
         if( execArgs.length == 1 ) {
+            LOGGER.debug("prepare (null)");
             streamsProcessor.prepare(null);
         } else if( execArgs.length > 1 ) {
+            LOGGER.debug("prepare " + Arrays.toString(prepareArgs));
             streamsProcessor.prepare(prepareArgs);
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5688a282/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDocumentExec.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDocumentExec.java b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDocumentExec.java
index 2624197..788b347 100644
--- a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDocumentExec.java
+++ b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDocumentExec.java
@@ -38,6 +38,7 @@ import org.apache.streams.core.StreamsProcessor;
 import org.apache.streams.data.ActivitySerializer;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.pojo.json.Activity;
+import org.slf4j.Logger;
 
 import java.io.IOException;
 import java.util.Arrays;
@@ -53,6 +54,8 @@ import java.util.concurrent.TimeUnit;
 @MonitoredUDF(timeUnit = TimeUnit.SECONDS, duration = 30, intDefault = 10)
 public class StreamsProcessDocumentExec extends SimpleEvalFunc<String> {
 
+    private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(StreamsProcessDocumentExec.class);
+
     StreamsProcessor streamsProcessor;
     ObjectMapper mapper = StreamsJacksonMapper.getInstance();
 
@@ -61,13 +64,13 @@ public class StreamsProcessDocumentExec extends SimpleEvalFunc<String> {
         Preconditions.checkArgument(execArgs.length > 0);
         String classFullName = execArgs[0];
         Preconditions.checkNotNull(classFullName);
-        String[] prepareArgs = new String[execArgs.length-1];
-        ArrayUtils.remove(execArgs, 0);
-        ArrayUtils.addAll(prepareArgs, execArgs);
+        String[] prepareArgs = (String[]) ArrayUtils.remove(execArgs, 0);
         streamsProcessor = StreamsComponentFactory.getProcessorInstance(Class.forName(classFullName));
         if( execArgs.length == 1 ) {
+            LOGGER.debug("prepare (null)");
             streamsProcessor.prepare(null);
         } else if( execArgs.length > 1 ) {
+            LOGGER.debug("prepare " + Arrays.toString(prepareArgs));
             streamsProcessor.prepare(prepareArgs);
         }
     }
@@ -77,17 +80,17 @@ public class StreamsProcessDocumentExec extends SimpleEvalFunc<String> {
         Preconditions.checkNotNull(streamsProcessor);
         Preconditions.checkNotNull(document);
 
-        System.out.println(document);
+        LOGGER.debug(document);
 
         StreamsDatum entry = new StreamsDatum(document);
 
         Preconditions.checkNotNull(entry);
 
-        System.out.println(entry);
+        LOGGER.debug(entry.toString());
 
         List<StreamsDatum> resultSet = streamsProcessor.process(entry);
 
-        System.out.println(resultSet);
+        LOGGER.debug(resultSet.toString());
 
         Object resultDoc = null;
         for( StreamsDatum resultDatum : resultSet ) {

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5688a282/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsSerializerExec.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsSerializerExec.java b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsSerializerExec.java
index 83c893c..d517752 100644
--- a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsSerializerExec.java
+++ b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsSerializerExec.java
@@ -37,6 +37,7 @@ import org.apache.streams.core.StreamsProcessor;
 import org.apache.streams.data.ActivitySerializer;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.pojo.json.Activity;
+import org.slf4j.Logger;
 
 import java.io.IOException;
 import java.util.List;
@@ -50,6 +51,8 @@ import java.util.concurrent.TimeUnit;
 @MonitoredUDF(timeUnit = TimeUnit.SECONDS, duration = 10, intDefault = 10)
 public class StreamsSerializerExec extends SimpleEvalFunc<String> {
 
+    private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(StreamsSerializerExec.class);
+
     ActivitySerializer activitySerializer;
     ObjectMapper mapper = StreamsJacksonMapper.getInstance();
 
@@ -58,9 +61,6 @@ public class StreamsSerializerExec extends SimpleEvalFunc<String> {
         Preconditions.checkArgument(execArgs.length > 0);
         String classFullName = execArgs[0];
         Preconditions.checkNotNull(classFullName);
-        String[] constructorArgs = new String[execArgs.length-1];
-        ArrayUtils.remove(execArgs, 0);
-        ArrayUtils.addAll(constructorArgs, execArgs);
         activitySerializer = StreamsComponentFactory.getSerializerInstance(Class.forName(classFullName));
     }
 
@@ -73,7 +73,7 @@ public class StreamsSerializerExec extends SimpleEvalFunc<String> {
         try {
             activity = activitySerializer.deserialize(document);
         } catch( Exception e ) {
-            e.printStackTrace();
+            LOGGER.warn(e.getMessage());
         }
         Preconditions.checkNotNull(activity);
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5688a282/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/AppendStringProcessor.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/AppendStringProcessor.java b/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/AppendStringProcessor.java
new file mode 100644
index 0000000..3f74750
--- /dev/null
+++ b/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/AppendStringProcessor.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.pig.test;
+
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+import org.slf4j.Logger;
+
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Used to Test Pig processor wrapper with arguments to prepare method
+ */
+public class AppendStringProcessor implements StreamsProcessor {
+
+    private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(AppendStringProcessor.class);
+
+    String append;
+
+    public AppendStringProcessor() {
+    }
+
+    @Override
+    public List<StreamsDatum> process(StreamsDatum entry) {
+        List<StreamsDatum> resultSet;
+        resultSet = new LinkedList<StreamsDatum>();
+        String value = (String) entry.getDocument()+ new String(append);
+        resultSet.add(new StreamsDatum(value));
+        return resultSet;
+    }
+
+    @Override
+    public void prepare(Object configurationObject) {
+        append = ((String[]) configurationObject)[0];
+    }
+
+    @Override
+    public void cleanUp() {
+        LOGGER.info("Processor clean up");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5688a282/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/CopyThriceProcessor.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/CopyThriceProcessor.java b/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/CopyThriceProcessor.java
index bb1a708..c7eeaf5 100644
--- a/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/CopyThriceProcessor.java
+++ b/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/CopyThriceProcessor.java
@@ -21,15 +21,18 @@ package org.apache.streams.pig.test;
 
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProcessor;
+import org.slf4j.Logger;
 
 import java.util.LinkedList;
 import java.util.List;
 
 /**
- * Used to Test Pig processor wrapper
+ * Used to Test Pig processor wrapper when multiple datums are returned
  */
 public class CopyThriceProcessor implements StreamsProcessor {
 
+    private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(CopyThriceProcessor.class);
+
     List<StreamsDatum> result;
 
     public CopyThriceProcessor() {
@@ -51,6 +54,6 @@ public class CopyThriceProcessor implements StreamsProcessor {
 
     @Override
     public void cleanUp() {
-        System.out.println("Processor clean up!");
+        LOGGER.info("Processor clean up");
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5688a282/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/DoNothingProcessor.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/DoNothingProcessor.java b/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/DoNothingProcessor.java
index 91a900a..37d417b 100644
--- a/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/DoNothingProcessor.java
+++ b/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/DoNothingProcessor.java
@@ -21,15 +21,18 @@ package org.apache.streams.pig.test;
 
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProcessor;
+import org.slf4j.Logger;
 
 import java.util.LinkedList;
 import java.util.List;
 
 /**
- * Used to Test Pig processor wrapper
+ * Used to Test Pig processor wrapper - datum passthrough
  */
 public class DoNothingProcessor implements StreamsProcessor {
 
+    private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(DoNothingProcessor.class);
+
     List<StreamsDatum> result;
 
     public DoNothingProcessor() {
@@ -44,11 +47,11 @@ public class DoNothingProcessor implements StreamsProcessor {
 
     @Override
     public void prepare(Object configurationObject) {
-
+        LOGGER.info("Processor prepare");
     }
 
     @Override
     public void cleanUp() {
-        System.out.println("Processor clean up!");
+        LOGGER.info("Processor clean up");
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5688a282/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/PigProcessDocumentTest.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/PigProcessDocumentTest.java b/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/PigProcessDocumentTest.java
index c73ca4e..4c7b6b2 100644
--- a/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/PigProcessDocumentTest.java
+++ b/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/PigProcessDocumentTest.java
@@ -87,4 +87,31 @@ public class PigProcessDocumentTest {
         test.assertOutput("in", input, "out", output);
 
     }
+
+    @Test
+    public void testPigProcessAppendDocument() throws Exception {
+
+        String[] input = {
+                "159475541894897679\ttwitter,statuses/user_timeline\t1384499359006\thowdy"
+        };
+
+        AppendStringProcessor processor = new AppendStringProcessor();
+
+        String doc = (String) StringUtils.split(input[0], '\t').get(3);
+        StreamsDatum inputDatum = new StreamsDatum(doc);
+        inputDatum.setId((String) StringUtils.split(input[0], '\t').get(0));
+
+        processor.prepare(new String[]{"doody"});
+
+        StreamsDatum resultDatum = processor.process(inputDatum).get(0);
+        String resultDocument = (String) resultDatum.getDocument();
+
+        String[] output = new String[1];
+        output[0] = "(159475541894897679,twitter,statuses/user_timeline,1384499359006,"+resultDocument+")";
+
+        PigTest test;
+        test = new PigTest("src/test/resources/pigprocessdocumentappendtest.pig");
+        test.assertOutput("in", input, "out", output);
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5688a282/streams-runtimes/streams-runtime-pig/src/test/resources/pigprocessdocumentappendtest.pig
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-pig/src/test/resources/pigprocessdocumentappendtest.pig b/streams-runtimes/streams-runtime-pig/src/test/resources/pigprocessdocumentappendtest.pig
new file mode 100644
index 0000000..6b159ea
--- /dev/null
+++ b/streams-runtimes/streams-runtime-pig/src/test/resources/pigprocessdocumentappendtest.pig
@@ -0,0 +1,4 @@
+DEFINE PROCESSOR org.apache.streams.pig.StreamsProcessDocumentExec('org.apache.streams.pig.test.AppendStringProcessor', 'doody');
+in = LOAD '*' USING PigStorage('\t') AS (id: chararray, source: chararray, timestamp: long, object: chararray);
+out = FOREACH in GENERATE id, source, timestamp, PROCESSOR(object);
+STORE out INTO 'target' USING PigStorage('\t');

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5688a282/streams-runtimes/streams-runtime-pig/src/test/resources/pigprocessdocumenttest.pig
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-pig/src/test/resources/pigprocessdocumenttest.pig b/streams-runtimes/streams-runtime-pig/src/test/resources/pigprocessdocumenttest.pig
index 3685e22..60df581 100644
--- a/streams-runtimes/streams-runtime-pig/src/test/resources/pigprocessdocumenttest.pig
+++ b/streams-runtimes/streams-runtime-pig/src/test/resources/pigprocessdocumenttest.pig
@@ -1,4 +1,4 @@
 DEFINE PROCESSOR org.apache.streams.pig.StreamsProcessDocumentExec('org.apache.streams.pig.test.DoNothingProcessor');
-in = LOAD '*' USING PigStorage('\t') AS (activityid: chararray, source: chararray, timestamp: long, object: chararray);
-out = FOREACH in GENERATE activityid, source, timestamp, PROCESSOR(object);
+in = LOAD '*' USING PigStorage('\t') AS (id: chararray, source: chararray, timestamp: long, object: chararray);
+out = FOREACH in GENERATE id, source, timestamp, PROCESSOR(object);
 STORE out INTO 'target' USING PigStorage('\t');


[2/2] git commit: Merge commit '5688a28272f93d48af4ff7e2848f5db7803880cd'

Posted by sb...@apache.org.
Merge commit '5688a28272f93d48af4ff7e2848f5db7803880cd'


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/e5c95d66
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/e5c95d66
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/e5c95d66

Branch: refs/heads/master
Commit: e5c95d664d3ac38abbef17bf204dcb56821d5c51
Parents: cbb60a4 5688a28
Author: Steve Blackmon <sb...@w2odigital.com>
Authored: Mon May 26 14:44:06 2014 -0500
Committer: Steve Blackmon <sb...@w2odigital.com>
Committed: Mon May 26 14:44:06 2014 -0500

----------------------------------------------------------------------
 streams-runtimes/streams-runtime-pig/pom.xml    |  4 +-
 .../streams/pig/StreamsComponentFactory.java    |  7 ++-
 .../streams/pig/StreamsProcessDatumExec.java    |  9 ++-
 .../streams/pig/StreamsProcessDocumentExec.java | 15 +++--
 .../streams/pig/StreamsSerializerExec.java      |  8 +--
 .../streams/pig/test/AppendStringProcessor.java | 59 ++++++++++++++++++++
 .../streams/pig/test/CopyThriceProcessor.java   |  7 ++-
 .../streams/pig/test/DoNothingProcessor.java    |  9 ++-
 .../pig/test/PigProcessDocumentTest.java        | 27 +++++++++
 .../resources/pigprocessdocumentappendtest.pig  |  4 ++
 .../test/resources/pigprocessdocumenttest.pig   |  4 +-
 11 files changed, 129 insertions(+), 24 deletions(-)
----------------------------------------------------------------------