You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/05/26 21:44:17 UTC
[1/2] git commit: Additional tests SLF4J logging More logging Class
comments Fixed issue in constructor causing process document to return empty
Version bump
Repository: incubator-streams
Updated Branches:
refs/heads/master cbb60a470 -> e5c95d664
Additional tests
SLF4J logging
More logging
Class comments
Fixed issue in constructor causing process document to return empty
Version bump
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/5688a282
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/5688a282
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/5688a282
Branch: refs/heads/master
Commit: 5688a28272f93d48af4ff7e2848f5db7803880cd
Parents: a4573f2
Author: sblackmon <sb...@w2odigital.com>
Authored: Fri May 23 13:16:00 2014 -0500
Committer: sblackmon <sb...@w2odigital.com>
Committed: Fri May 23 13:16:00 2014 -0500
----------------------------------------------------------------------
streams-runtimes/streams-runtime-pig/pom.xml | 4 +-
.../streams/pig/StreamsComponentFactory.java | 7 ++-
.../streams/pig/StreamsProcessDatumExec.java | 9 ++-
.../streams/pig/StreamsProcessDocumentExec.java | 15 +++--
.../streams/pig/StreamsSerializerExec.java | 8 +--
.../streams/pig/test/AppendStringProcessor.java | 59 ++++++++++++++++++++
.../streams/pig/test/CopyThriceProcessor.java | 7 ++-
.../streams/pig/test/DoNothingProcessor.java | 9 ++-
.../pig/test/PigProcessDocumentTest.java | 27 +++++++++
.../resources/pigprocessdocumentappendtest.pig | 4 ++
.../test/resources/pigprocessdocumenttest.pig | 4 +-
11 files changed, 129 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5688a282/streams-runtimes/streams-runtime-pig/pom.xml
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-pig/pom.xml b/streams-runtimes/streams-runtime-pig/pom.xml
index 14c6994..28a4e16 100644
--- a/streams-runtimes/streams-runtime-pig/pom.xml
+++ b/streams-runtimes/streams-runtime-pig/pom.xml
@@ -29,8 +29,8 @@
<artifactId>streams-runtime-pig</artifactId>
<properties>
- <hadoop-client.version>2.0.0-cdh4.5.0.1-SNAPSHOT</hadoop-client.version>
- <pig.version>0.11.0-cdh4.5.0.1-SNAPSHOT</pig.version>
+ <hadoop-client.version>2.0.0-mr1-cdh4.6.0</hadoop-client.version>
+ <pig.version>0.11.0-cdh4.6.0</pig.version>
</properties>
<dependencies>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5688a282/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsComponentFactory.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsComponentFactory.java b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsComponentFactory.java
index 1ef831c..e92a5ae 100644
--- a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsComponentFactory.java
+++ b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsComponentFactory.java
@@ -24,6 +24,7 @@ import com.google.common.collect.Lists;
import org.apache.commons.lang.ArrayUtils;
import org.apache.streams.core.StreamsProcessor;
import org.apache.streams.data.ActivitySerializer;
+import org.slf4j.Logger;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
@@ -35,13 +36,15 @@ import java.util.List;
*/
public class StreamsComponentFactory {
+ private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(StreamsComponentFactory.class);
+
public static ActivitySerializer getSerializerInstance(Class<?> serializerClazz) {
Object object = null;
try {
object = serializerClazz.getConstructor().newInstance();
} catch (Exception e) {
- e.printStackTrace();
+ LOGGER.error(e.getMessage());
}
Preconditions.checkNotNull(object);
@@ -58,7 +61,7 @@ public class StreamsComponentFactory {
try {
object = processorClazz.getConstructor().newInstance();
} catch (Exception e) {
- e.printStackTrace();
+ LOGGER.error(e.getMessage());
}
StreamsProcessor processor = (StreamsProcessor) object;
return processor;
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5688a282/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDatumExec.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDatumExec.java b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDatumExec.java
index 580712e..c19e097 100644
--- a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDatumExec.java
+++ b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDatumExec.java
@@ -33,6 +33,7 @@ import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsProcessor;
import org.apache.streams.data.util.RFC3339Utils;
import org.joda.time.DateTime;
+import org.slf4j.Logger;
import java.io.IOException;
import java.util.Arrays;
@@ -45,6 +46,8 @@ import java.util.concurrent.TimeUnit;
@MonitoredUDF(timeUnit = TimeUnit.SECONDS, duration = 30, intDefault = 10)
public class StreamsProcessDatumExec extends AliasableEvalFunc<DataBag> {
+ private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(StreamsProcessDatumExec.class);
+
TupleFactory mTupleFactory = TupleFactory.getInstance();
BagFactory mBagFactory = BagFactory.getInstance();
@@ -55,13 +58,13 @@ public class StreamsProcessDatumExec extends AliasableEvalFunc<DataBag> {
Preconditions.checkArgument(execArgs.length > 0);
String classFullName = execArgs[0];
Preconditions.checkNotNull(classFullName);
- String[] prepareArgs = new String[execArgs.length-1];
- ArrayUtils.remove(execArgs, 0);
- ArrayUtils.addAll(prepareArgs, execArgs);
+ String[] prepareArgs = (String[]) ArrayUtils.remove(execArgs, 0);
streamsProcessor = StreamsComponentFactory.getProcessorInstance(Class.forName(classFullName));
if( execArgs.length == 1 ) {
+ LOGGER.debug("prepare (null)");
streamsProcessor.prepare(null);
} else if( execArgs.length > 1 ) {
+ LOGGER.debug("prepare " + Arrays.toString(prepareArgs));
streamsProcessor.prepare(prepareArgs);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5688a282/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDocumentExec.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDocumentExec.java b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDocumentExec.java
index 2624197..788b347 100644
--- a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDocumentExec.java
+++ b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDocumentExec.java
@@ -38,6 +38,7 @@ import org.apache.streams.core.StreamsProcessor;
import org.apache.streams.data.ActivitySerializer;
import org.apache.streams.jackson.StreamsJacksonMapper;
import org.apache.streams.pojo.json.Activity;
+import org.slf4j.Logger;
import java.io.IOException;
import java.util.Arrays;
@@ -53,6 +54,8 @@ import java.util.concurrent.TimeUnit;
@MonitoredUDF(timeUnit = TimeUnit.SECONDS, duration = 30, intDefault = 10)
public class StreamsProcessDocumentExec extends SimpleEvalFunc<String> {
+ private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(StreamsProcessDocumentExec.class);
+
StreamsProcessor streamsProcessor;
ObjectMapper mapper = StreamsJacksonMapper.getInstance();
@@ -61,13 +64,13 @@ public class StreamsProcessDocumentExec extends SimpleEvalFunc<String> {
Preconditions.checkArgument(execArgs.length > 0);
String classFullName = execArgs[0];
Preconditions.checkNotNull(classFullName);
- String[] prepareArgs = new String[execArgs.length-1];
- ArrayUtils.remove(execArgs, 0);
- ArrayUtils.addAll(prepareArgs, execArgs);
+ String[] prepareArgs = (String[]) ArrayUtils.remove(execArgs, 0);
streamsProcessor = StreamsComponentFactory.getProcessorInstance(Class.forName(classFullName));
if( execArgs.length == 1 ) {
+ LOGGER.debug("prepare (null)");
streamsProcessor.prepare(null);
} else if( execArgs.length > 1 ) {
+ LOGGER.debug("prepare " + Arrays.toString(prepareArgs));
streamsProcessor.prepare(prepareArgs);
}
}
@@ -77,17 +80,17 @@ public class StreamsProcessDocumentExec extends SimpleEvalFunc<String> {
Preconditions.checkNotNull(streamsProcessor);
Preconditions.checkNotNull(document);
- System.out.println(document);
+ LOGGER.debug(document);
StreamsDatum entry = new StreamsDatum(document);
Preconditions.checkNotNull(entry);
- System.out.println(entry);
+ LOGGER.debug(entry.toString());
List<StreamsDatum> resultSet = streamsProcessor.process(entry);
- System.out.println(resultSet);
+ LOGGER.debug(resultSet.toString());
Object resultDoc = null;
for( StreamsDatum resultDatum : resultSet ) {
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5688a282/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsSerializerExec.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsSerializerExec.java b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsSerializerExec.java
index 83c893c..d517752 100644
--- a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsSerializerExec.java
+++ b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsSerializerExec.java
@@ -37,6 +37,7 @@ import org.apache.streams.core.StreamsProcessor;
import org.apache.streams.data.ActivitySerializer;
import org.apache.streams.jackson.StreamsJacksonMapper;
import org.apache.streams.pojo.json.Activity;
+import org.slf4j.Logger;
import java.io.IOException;
import java.util.List;
@@ -50,6 +51,8 @@ import java.util.concurrent.TimeUnit;
@MonitoredUDF(timeUnit = TimeUnit.SECONDS, duration = 10, intDefault = 10)
public class StreamsSerializerExec extends SimpleEvalFunc<String> {
+ private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(StreamsSerializerExec.class);
+
ActivitySerializer activitySerializer;
ObjectMapper mapper = StreamsJacksonMapper.getInstance();
@@ -58,9 +61,6 @@ public class StreamsSerializerExec extends SimpleEvalFunc<String> {
Preconditions.checkArgument(execArgs.length > 0);
String classFullName = execArgs[0];
Preconditions.checkNotNull(classFullName);
- String[] constructorArgs = new String[execArgs.length-1];
- ArrayUtils.remove(execArgs, 0);
- ArrayUtils.addAll(constructorArgs, execArgs);
activitySerializer = StreamsComponentFactory.getSerializerInstance(Class.forName(classFullName));
}
@@ -73,7 +73,7 @@ public class StreamsSerializerExec extends SimpleEvalFunc<String> {
try {
activity = activitySerializer.deserialize(document);
} catch( Exception e ) {
- e.printStackTrace();
+ LOGGER.warn(e.getMessage());
}
Preconditions.checkNotNull(activity);
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5688a282/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/AppendStringProcessor.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/AppendStringProcessor.java b/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/AppendStringProcessor.java
new file mode 100644
index 0000000..3f74750
--- /dev/null
+++ b/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/AppendStringProcessor.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.pig.test;
+
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+import org.slf4j.Logger;
+
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Used to Test Pig processor wrapper with arguments to prepare method
+ */
+public class AppendStringProcessor implements StreamsProcessor {
+
+ private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(AppendStringProcessor.class);
+
+ String append;
+
+ public AppendStringProcessor() {
+ }
+
+ @Override
+ public List<StreamsDatum> process(StreamsDatum entry) {
+ List<StreamsDatum> resultSet;
+ resultSet = new LinkedList<StreamsDatum>();
+ String value = (String) entry.getDocument()+ new String(append);
+ resultSet.add(new StreamsDatum(value));
+ return resultSet;
+ }
+
+ @Override
+ public void prepare(Object configurationObject) {
+ append = ((String[]) configurationObject)[0];
+ }
+
+ @Override
+ public void cleanUp() {
+ LOGGER.info("Processor clean up");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5688a282/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/CopyThriceProcessor.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/CopyThriceProcessor.java b/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/CopyThriceProcessor.java
index bb1a708..c7eeaf5 100644
--- a/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/CopyThriceProcessor.java
+++ b/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/CopyThriceProcessor.java
@@ -21,15 +21,18 @@ package org.apache.streams.pig.test;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsProcessor;
+import org.slf4j.Logger;
import java.util.LinkedList;
import java.util.List;
/**
- * Used to Test Pig processor wrapper
+ * Used to Test Pig processor wrapper when multiple datums are returned
*/
public class CopyThriceProcessor implements StreamsProcessor {
+ private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(CopyThriceProcessor.class);
+
List<StreamsDatum> result;
public CopyThriceProcessor() {
@@ -51,6 +54,6 @@ public class CopyThriceProcessor implements StreamsProcessor {
@Override
public void cleanUp() {
- System.out.println("Processor clean up!");
+ LOGGER.info("Processor clean up");
}
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5688a282/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/DoNothingProcessor.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/DoNothingProcessor.java b/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/DoNothingProcessor.java
index 91a900a..37d417b 100644
--- a/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/DoNothingProcessor.java
+++ b/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/DoNothingProcessor.java
@@ -21,15 +21,18 @@ package org.apache.streams.pig.test;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsProcessor;
+import org.slf4j.Logger;
import java.util.LinkedList;
import java.util.List;
/**
- * Used to Test Pig processor wrapper
+ * Used to Test Pig processor wrapper - datum passthrough
*/
public class DoNothingProcessor implements StreamsProcessor {
+ private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(DoNothingProcessor.class);
+
List<StreamsDatum> result;
public DoNothingProcessor() {
@@ -44,11 +47,11 @@ public class DoNothingProcessor implements StreamsProcessor {
@Override
public void prepare(Object configurationObject) {
-
+ LOGGER.info("Processor prepare");
}
@Override
public void cleanUp() {
- System.out.println("Processor clean up!");
+ LOGGER.info("Processor clean up");
}
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5688a282/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/PigProcessDocumentTest.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/PigProcessDocumentTest.java b/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/PigProcessDocumentTest.java
index c73ca4e..4c7b6b2 100644
--- a/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/PigProcessDocumentTest.java
+++ b/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/PigProcessDocumentTest.java
@@ -87,4 +87,31 @@ public class PigProcessDocumentTest {
test.assertOutput("in", input, "out", output);
}
+
+ @Test
+ public void testPigProcessAppendDocument() throws Exception {
+
+ String[] input = {
+ "159475541894897679\ttwitter,statuses/user_timeline\t1384499359006\thowdy"
+ };
+
+ AppendStringProcessor processor = new AppendStringProcessor();
+
+ String doc = (String) StringUtils.split(input[0], '\t').get(3);
+ StreamsDatum inputDatum = new StreamsDatum(doc);
+ inputDatum.setId((String) StringUtils.split(input[0], '\t').get(0));
+
+ processor.prepare(new String[]{"doody"});
+
+ StreamsDatum resultDatum = processor.process(inputDatum).get(0);
+ String resultDocument = (String) resultDatum.getDocument();
+
+ String[] output = new String[1];
+ output[0] = "(159475541894897679,twitter,statuses/user_timeline,1384499359006,"+resultDocument+")";
+
+ PigTest test;
+ test = new PigTest("src/test/resources/pigprocessdocumentappendtest.pig");
+ test.assertOutput("in", input, "out", output);
+
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5688a282/streams-runtimes/streams-runtime-pig/src/test/resources/pigprocessdocumentappendtest.pig
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-pig/src/test/resources/pigprocessdocumentappendtest.pig b/streams-runtimes/streams-runtime-pig/src/test/resources/pigprocessdocumentappendtest.pig
new file mode 100644
index 0000000..6b159ea
--- /dev/null
+++ b/streams-runtimes/streams-runtime-pig/src/test/resources/pigprocessdocumentappendtest.pig
@@ -0,0 +1,4 @@
+DEFINE PROCESSOR org.apache.streams.pig.StreamsProcessDocumentExec('org.apache.streams.pig.test.AppendStringProcessor', 'doody');
+in = LOAD '*' USING PigStorage('\t') AS (id: chararray, source: chararray, timestamp: long, object: chararray);
+out = FOREACH in GENERATE id, source, timestamp, PROCESSOR(object);
+STORE out INTO 'target' USING PigStorage('\t');
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5688a282/streams-runtimes/streams-runtime-pig/src/test/resources/pigprocessdocumenttest.pig
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-pig/src/test/resources/pigprocessdocumenttest.pig b/streams-runtimes/streams-runtime-pig/src/test/resources/pigprocessdocumenttest.pig
index 3685e22..60df581 100644
--- a/streams-runtimes/streams-runtime-pig/src/test/resources/pigprocessdocumenttest.pig
+++ b/streams-runtimes/streams-runtime-pig/src/test/resources/pigprocessdocumenttest.pig
@@ -1,4 +1,4 @@
DEFINE PROCESSOR org.apache.streams.pig.StreamsProcessDocumentExec('org.apache.streams.pig.test.DoNothingProcessor');
-in = LOAD '*' USING PigStorage('\t') AS (activityid: chararray, source: chararray, timestamp: long, object: chararray);
-out = FOREACH in GENERATE activityid, source, timestamp, PROCESSOR(object);
+in = LOAD '*' USING PigStorage('\t') AS (id: chararray, source: chararray, timestamp: long, object: chararray);
+out = FOREACH in GENERATE id, source, timestamp, PROCESSOR(object);
STORE out INTO 'target' USING PigStorage('\t');
[2/2] git commit: Merge commit
'5688a28272f93d48af4ff7e2848f5db7803880cd'
Posted by sb...@apache.org.
Merge commit '5688a28272f93d48af4ff7e2848f5db7803880cd'
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/e5c95d66
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/e5c95d66
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/e5c95d66
Branch: refs/heads/master
Commit: e5c95d664d3ac38abbef17bf204dcb56821d5c51
Parents: cbb60a4 5688a28
Author: Steve Blackmon <sb...@w2odigital.com>
Authored: Mon May 26 14:44:06 2014 -0500
Committer: Steve Blackmon <sb...@w2odigital.com>
Committed: Mon May 26 14:44:06 2014 -0500
----------------------------------------------------------------------
streams-runtimes/streams-runtime-pig/pom.xml | 4 +-
.../streams/pig/StreamsComponentFactory.java | 7 ++-
.../streams/pig/StreamsProcessDatumExec.java | 9 ++-
.../streams/pig/StreamsProcessDocumentExec.java | 15 +++--
.../streams/pig/StreamsSerializerExec.java | 8 +--
.../streams/pig/test/AppendStringProcessor.java | 59 ++++++++++++++++++++
.../streams/pig/test/CopyThriceProcessor.java | 7 ++-
.../streams/pig/test/DoNothingProcessor.java | 9 ++-
.../pig/test/PigProcessDocumentTest.java | 27 +++++++++
.../resources/pigprocessdocumentappendtest.pig | 4 ++
.../test/resources/pigprocessdocumenttest.pig | 4 +-
11 files changed, 129 insertions(+), 24 deletions(-)
----------------------------------------------------------------------