You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2016/11/25 20:24:45 UTC

[04/42] incubator-streams git commit: STREAMS-440: custom checkstyle.xml, address compliance

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/writer/DatumCounterWriter.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/writer/DatumCounterWriter.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/writer/DatumCounterWriter.java
index b7f777e..0c7af1e 100644
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/writer/DatumCounterWriter.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/writer/DatumCounterWriter.java
@@ -18,11 +18,16 @@
 
 package org.apache.streams.local.test.writer;
 
-import com.google.common.collect.Lists;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsPersistWriter;
 
-import java.util.*;
+import com.google.common.collect.Lists;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -31,79 +36,79 @@ import java.util.concurrent.atomic.AtomicLong;
  */
 public class DatumCounterWriter implements StreamsPersistWriter{
 
-    @Override
-    public String getId() {
-        return "DatumCounterWriter";
-    }
+  @Override
+  public String getId() {
+    return "DatumCounterWriter";
+  }
 
-    /**
-     * Set of all ids that have been claimed.  Ensures all instances are assigned unique ids
-     */
-    public static Set<Integer> CLAIMED_ID = new HashSet<Integer>();
-    /**
-     * Random instance to generate ids
-     */
-    public static final Random RAND = new Random();
-    /**
-     * Set of instance ids that received data. Usefully for testing parrallelization is actually working.
-     */
-    public final static Set<Integer> SEEN_DATA = Collections.newSetFromMap(new ConcurrentHashMap<Integer, Boolean>());
-    /**
-     * The total count of data seen by a all instances of a processor.
-     */
-    public static final ConcurrentHashMap<String, AtomicLong> COUNTS = new ConcurrentHashMap<>();
-    /**
-     * The documents received
-     */
-    public static final ConcurrentHashMap<String, List<Object>> RECEIVED = new ConcurrentHashMap<>();
+  /**
+   * Set of all ids that have been claimed.  Ensures all instances are assigned unique ids
+   */
+  public static Set<Integer> CLAIMED_ID = new HashSet<Integer>();
+  /**
+   * Random instance to generate ids
+   */
+  public static final Random RAND = new Random();
+  /**
+   * Set of instance ids that received data. Usefully for testing parrallelization is actually working.
+   */
+  public final static Set<Integer> SEEN_DATA = Collections.newSetFromMap(new ConcurrentHashMap<Integer, Boolean>());
+  /**
+   * The total count of data seen by a all instances of a processor.
+   */
+  public static final ConcurrentHashMap<String, AtomicLong> COUNTS = new ConcurrentHashMap<>();
+  /**
+   * The documents received
+   */
+  public static final ConcurrentHashMap<String, List<Object>> RECEIVED = new ConcurrentHashMap<>();
 
-    private int counter = 0;
-    private String writerId;
-    private Integer id;
+  private int counter = 0;
+  private String writerId;
+  private Integer id;
 
-    public DatumCounterWriter(String writerId) {
-        this.writerId = writerId;
-    }
+  public DatumCounterWriter(String writerId) {
+    this.writerId = writerId;
+  }
 
-    @Override
-    public void write(StreamsDatum entry) {
-        ++this.counter;
-        SEEN_DATA.add(this.id);
-        synchronized (RECEIVED) {
-            List<Object> documents = RECEIVED.get(this.writerId);
-            if(documents == null) {
-                List<Object> docs = Lists.newLinkedList();
-                docs.add(entry.getDocument());
-                RECEIVED.put(this.writerId, docs);
-            } else {
-                documents.add(entry.getDocument());
-            }
-        }
+  @Override
+  public void write(StreamsDatum entry) {
+    ++this.counter;
+    SEEN_DATA.add(this.id);
+    synchronized (RECEIVED) {
+      List<Object> documents = RECEIVED.get(this.writerId);
+      if(documents == null) {
+        List<Object> docs = Lists.newLinkedList();
+        docs.add(entry.getDocument());
+        RECEIVED.put(this.writerId, docs);
+      } else {
+        documents.add(entry.getDocument());
+      }
     }
+  }
 
-    @Override
-    public void prepare(Object configurationObject) {
-        synchronized (CLAIMED_ID) {
-            this.id = RAND.nextInt();
-            while(!CLAIMED_ID.add(this.id)) {
-                this.id = RAND.nextInt();
-            }
-        }
+  @Override
+  public void prepare(Object configurationObject) {
+    synchronized (CLAIMED_ID) {
+      this.id = RAND.nextInt();
+      while(!CLAIMED_ID.add(this.id)) {
+        this.id = RAND.nextInt();
+      }
     }
+  }
 
-    @Override
-    public void cleanUp() {
-        synchronized (COUNTS) {
-            AtomicLong count = COUNTS.get(this.writerId);
-            if(count == null) {
-                COUNTS.put(this.writerId, new AtomicLong(this.counter));
-            } else {
-                count.addAndGet(this.counter);
-            }
-        }
+  @Override
+  public void cleanUp() {
+    synchronized (COUNTS) {
+      AtomicLong count = COUNTS.get(this.writerId);
+      if(count == null) {
+        COUNTS.put(this.writerId, new AtomicLong(this.counter));
+      } else {
+        count.addAndGet(this.counter);
+      }
     }
+  }
 
-    public int getDatumsCounted() {
-        return this.counter;
-    }
+  public int getDatumsCounted() {
+    return this.counter;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/writer/DoNothingWriter.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/writer/DoNothingWriter.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/writer/DoNothingWriter.java
index d9ec6d3..48f4b68 100644
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/writer/DoNothingWriter.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/writer/DoNothingWriter.java
@@ -20,6 +20,7 @@ package org.apache.streams.local.test.writer;
 
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsPersistWriter;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -28,25 +29,25 @@ import org.slf4j.LoggerFactory;
  */
 public class DoNothingWriter implements StreamsPersistWriter {
 
-    private final static Logger LOGGER = LoggerFactory.getLogger(DoNothingWriter.class);
+  private final static Logger LOGGER = LoggerFactory.getLogger(DoNothingWriter.class);
 
-    @Override
-    public String getId() {
-        return "DoNothingWriter";
-    }
+  @Override
+  public String getId() {
+    return "DoNothingWriter";
+  }
 
-    @Override
-    public void write(StreamsDatum entry) {
+  @Override
+  public void write(StreamsDatum entry) {
 
-    }
+  }
 
-    @Override
-    public void prepare(Object configurationObject) {
+  @Override
+  public void prepare(Object configurationObject) {
 
-    }
+  }
 
-    @Override
-    public void cleanUp() {
-        LOGGER.debug("Writer Clean Up!");
-    }
+  @Override
+  public void cleanUp() {
+    LOGGER.debug("Writer Clean Up!");
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/writer/SystemOutWriter.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/writer/SystemOutWriter.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/writer/SystemOutWriter.java
index 76ce353..2711ae1 100644
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/writer/SystemOutWriter.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/writer/SystemOutWriter.java
@@ -20,6 +20,7 @@ package org.apache.streams.local.test.writer;
 
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsPersistWriter;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -28,25 +29,25 @@ import org.slf4j.LoggerFactory;
  */
 public class SystemOutWriter implements StreamsPersistWriter {
 
-    private final static Logger LOGGER = LoggerFactory.getLogger(SystemOutWriter.class);
+  private final static Logger LOGGER = LoggerFactory.getLogger(SystemOutWriter.class);
 
-    @Override
-    public String getId() {
-        return "SystemOutWriter";
-    }
+  @Override
+  public String getId() {
+    return "SystemOutWriter";
+  }
 
-    @Override
-    public void write(StreamsDatum entry) {
-        System.out.println(entry.document);
-    }
+  @Override
+  public void write(StreamsDatum entry) {
+    System.out.println(entry.document);
+  }
 
-    @Override
-    public void prepare(Object configurationObject) {
+  @Override
+  public void prepare(Object configurationObject) {
 
-    }
+  }
 
-    @Override
-    public void cleanUp() {
-        LOGGER.debug("Clean up called writer!");
-    }
+  @Override
+  public void cleanUp() {
+    LOGGER.debug("Clean up called writer!");
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/ExpectedDatumsPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/ExpectedDatumsPersistWriter.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/ExpectedDatumsPersistWriter.java
index 80d4a24..16b98c4 100644
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/ExpectedDatumsPersistWriter.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/ExpectedDatumsPersistWriter.java
@@ -20,58 +20,57 @@ package org.apache.streams.test.component;
 
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsPersistWriter;
-import static org.junit.Assert.*;
 
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.InputStream;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Scanner;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
 /**
  * Created by rebanks on 2/27/14.
  */
 public class ExpectedDatumsPersistWriter implements StreamsPersistWriter{
 
-    @Override
-    public String getId() {
-        return "ExpectedDatumsPersistWriter";
-    }
+  @Override
+  public String getId() {
+    return "ExpectedDatumsPersistWriter";
+  }
 
-    private StreamsDatumConverter converter;
-    private String fileName;
-    private List<StreamsDatum> expectedDatums;
-    private int counted = 0;
-    private int expectedSize = 0;
+  private StreamsDatumConverter converter;
+  private String fileName;
+  private List<StreamsDatum> expectedDatums;
+  private int counted = 0;
+  private int expectedSize = 0;
 
-    public ExpectedDatumsPersistWriter(StreamsDatumConverter converter, String filePathInResources) {
-        this.converter = converter;
-        this.fileName = filePathInResources;
-    }
+  public ExpectedDatumsPersistWriter(StreamsDatumConverter converter, String filePathInResources) {
+    this.converter = converter;
+    this.fileName = filePathInResources;
+  }
 
 
 
-    @Override
-    public void write(StreamsDatum entry) {
-        int index = this.expectedDatums.indexOf(entry);
-        assertNotEquals("Datum not expected. "+entry.toString(), -1, index);
-        this.expectedDatums.remove(index);
-        ++this.counted;
-    }
+  @Override
+  public void write(StreamsDatum entry) {
+    int index = this.expectedDatums.indexOf(entry);
+    assertNotEquals("Datum not expected. "+entry.toString(), -1, index);
+    this.expectedDatums.remove(index);
+    ++this.counted;
+  }
 
-    @Override
-    public void prepare(Object configurationObject) {
-        Scanner scanner = new Scanner(ExpectedDatumsPersistWriter.class.getResourceAsStream(this.fileName));
-        this.expectedDatums = new LinkedList<StreamsDatum>();
-        while(scanner.hasNextLine()) {
-            this.expectedDatums.add(this.converter.convert(scanner.nextLine()));
-        }
-        this.expectedSize = this.expectedDatums.size();
+  @Override
+  public void prepare(Object configurationObject) {
+    Scanner scanner = new Scanner(ExpectedDatumsPersistWriter.class.getResourceAsStream(this.fileName));
+    this.expectedDatums = new LinkedList<StreamsDatum>();
+    while(scanner.hasNextLine()) {
+      this.expectedDatums.add(this.converter.convert(scanner.nextLine()));
     }
+    this.expectedSize = this.expectedDatums.size();
+  }
 
-    @Override
-    public void cleanUp() {
-        assertEquals("Did not received the expected number of StreamsDatums", this.expectedSize, this.counted);
-    }
+  @Override
+  public void cleanUp() {
+    assertEquals("Did not received the expected number of StreamsDatums", this.expectedSize, this.counted);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/FileReaderProvider.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/FileReaderProvider.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/FileReaderProvider.java
index 41e7eed..0fbfae9 100644
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/FileReaderProvider.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/FileReaderProvider.java
@@ -18,10 +18,11 @@
 
 package org.apache.streams.test.component;
 
-import com.google.common.collect.Queues;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProvider;
 import org.apache.streams.core.StreamsResultSet;
+
+import com.google.common.collect.Queues;
 import org.joda.time.DateTime;
 
 import java.math.BigInteger;
@@ -37,64 +38,64 @@ import java.util.Scanner;
  */
 public class FileReaderProvider implements StreamsProvider {
 
-    private String fileName;
-    private Scanner scanner;
-    private StreamsDatumConverter converter;
-
-    public FileReaderProvider(String filePathInResources, StreamsDatumConverter converter) {
-        this.fileName = filePathInResources;
-        this.converter = converter;
-    }
-
-    @Override
-    public String getId() {
-        return "FileReaderProvider";
-    }
-
-    @Override
-    public void startStream() {
-
-    }
-
-    @Override
-    public StreamsResultSet readCurrent() {
-        return new StreamsResultSet(constructQueue(this.scanner));
-    }
-
-    @Override
-    public StreamsResultSet readNew(BigInteger sequence) {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public StreamsResultSet readRange(DateTime start, DateTime end) {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public boolean isRunning() {
-        return this.scanner != null && this.scanner.hasNextLine();
-    }
-
-    @Override
-    public void prepare(Object configurationObject) {
-        this.scanner = new Scanner(FileReaderProvider.class.getResourceAsStream(this.fileName));
-    }
-
-    @Override
-    public void cleanUp() {
-        if(this.scanner!= null) {
-            this.scanner.close();
-            this.scanner = null;
-        }
+  private String fileName;
+  private Scanner scanner;
+  private StreamsDatumConverter converter;
+
+  public FileReaderProvider(String filePathInResources, StreamsDatumConverter converter) {
+    this.fileName = filePathInResources;
+    this.converter = converter;
+  }
+
+  @Override
+  public String getId() {
+    return "FileReaderProvider";
+  }
+
+  @Override
+  public void startStream() {
+
+  }
+
+  @Override
+  public StreamsResultSet readCurrent() {
+    return new StreamsResultSet(constructQueue(this.scanner));
+  }
+
+  @Override
+  public StreamsResultSet readNew(BigInteger sequence) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public StreamsResultSet readRange(DateTime start, DateTime end) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean isRunning() {
+    return this.scanner != null && this.scanner.hasNextLine();
+  }
+
+  @Override
+  public void prepare(Object configurationObject) {
+    this.scanner = new Scanner(FileReaderProvider.class.getResourceAsStream(this.fileName));
+  }
+
+  @Override
+  public void cleanUp() {
+    if(this.scanner!= null) {
+      this.scanner.close();
+      this.scanner = null;
     }
+  }
 
-    private Queue<StreamsDatum> constructQueue(Scanner scanner) {
-        Queue<StreamsDatum> data = Queues.newLinkedBlockingQueue();
-        while(scanner.hasNextLine()) {
-            data.add(converter.convert(scanner.nextLine()));
-        }
-        cleanUp();
-        return data;
+  private Queue<StreamsDatum> constructQueue(Scanner scanner) {
+    Queue<StreamsDatum> data = Queues.newLinkedBlockingQueue();
+    while(scanner.hasNextLine()) {
+      data.add(converter.convert(scanner.nextLine()));
     }
+    cleanUp();
+    return data;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/StreamsDatumConverter.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/StreamsDatumConverter.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/StreamsDatumConverter.java
index e3b7dd1..9172167 100644
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/StreamsDatumConverter.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/StreamsDatumConverter.java
@@ -27,5 +27,5 @@ import java.io.Serializable;
  */
 public interface StreamsDatumConverter extends Serializable {
 
-    public StreamsDatum convert(String s);
+  public StreamsDatum convert(String s);
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/StringToDocumentConverter.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/StringToDocumentConverter.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/StringToDocumentConverter.java
index 6f4e620..3727aa1 100644
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/StringToDocumentConverter.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/StringToDocumentConverter.java
@@ -25,9 +25,9 @@ import org.apache.streams.core.StreamsDatum;
  */
 public class StringToDocumentConverter implements StreamsDatumConverter {
 
-    @Override
-    public StreamsDatum convert(String s) {
-        return new StreamsDatum(s);
-    }
+  @Override
+  public StreamsDatum convert(String s) {
+    return new StreamsDatum(s);
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/tests/TestComponentsLocalStream.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/tests/TestComponentsLocalStream.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/tests/TestComponentsLocalStream.java
index 935c8fe..5154ea3 100644
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/tests/TestComponentsLocalStream.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/tests/TestComponentsLocalStream.java
@@ -23,6 +23,7 @@ import org.apache.streams.test.component.ExpectedDatumsPersistWriter;
 import org.apache.streams.test.component.FileReaderProvider;
 import org.apache.streams.test.component.StringToDocumentConverter;
 import org.apache.streams.util.ComponentUtils;
+
 import org.junit.After;
 import org.junit.Test;
 
@@ -31,22 +32,22 @@ import org.junit.Test;
  */
 public class TestComponentsLocalStream {
 
-    @After
-    public void removeLocalMBeans() {
-        try {
-            ComponentUtils.removeAllMBeansOfDomain("org.apache.streams.local");
-        } catch (Exception e) {
-            //No op.  proceed to next test
-        }
+  @After
+  public void removeLocalMBeans() {
+    try {
+      ComponentUtils.removeAllMBeansOfDomain("org.apache.streams.local");
+    } catch (Exception e) {
+      //No op.  proceed to next test
     }
+  }
 
-    @Test
-    public void testLocalStreamWithComponent() {
-        LocalStreamBuilder builder = new LocalStreamBuilder();
-        builder.newReadCurrentStream("provider", new FileReaderProvider("/TestFile.txt",
-                                                                        new StringToDocumentConverter()));
-        builder.addStreamsPersistWriter("writer", new ExpectedDatumsPersistWriter(new StringToDocumentConverter(),
-                "/TestFile.txt"), 1, "provider")
+  @Test
+  public void testLocalStreamWithComponent() {
+    LocalStreamBuilder builder = new LocalStreamBuilder();
+    builder.newReadCurrentStream("provider", new FileReaderProvider("/TestFile.txt",
+        new StringToDocumentConverter()));
+    builder.addStreamsPersistWriter("writer", new ExpectedDatumsPersistWriter(new StringToDocumentConverter(),
+        "/TestFile.txt"), 1, "provider")
         .start();
-    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/tests/TestExpectedDatumsPersitWriter.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/tests/TestExpectedDatumsPersitWriter.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/tests/TestExpectedDatumsPersitWriter.java
index 11e891b..0535295 100644
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/tests/TestExpectedDatumsPersitWriter.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/tests/TestExpectedDatumsPersitWriter.java
@@ -22,8 +22,8 @@ import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.test.component.ExpectedDatumsPersistWriter;
 import org.apache.streams.test.component.StringToDocumentConverter;
 import org.apache.streams.util.ComponentUtils;
+
 import org.junit.After;
-import org.junit.Ignore;
 import org.junit.Test;
 
 /**
@@ -31,37 +31,37 @@ import org.junit.Test;
  */
 public class TestExpectedDatumsPersitWriter {
 
-    private static final StreamsDatum[] INPUT_DATUMS = new StreamsDatum[] {
-            new StreamsDatum("Document1"),
-            new StreamsDatum("Document2"),
-            new StreamsDatum("Document3"),
-            new StreamsDatum("Document4")
+  private static final StreamsDatum[] INPUT_DATUMS = new StreamsDatum[] {
+      new StreamsDatum("Document1"),
+      new StreamsDatum("Document2"),
+      new StreamsDatum("Document3"),
+      new StreamsDatum("Document4")
 //            Uncomment to prove failures occur, or comment out a datum above
 //            ,new StreamsDatum("Document5")
-    };
+  };
 
-    @After
-    public void removeLocalMBeans() {
-        try {
-            ComponentUtils.removeAllMBeansOfDomain("org.apache.streams.local");
-        } catch (Exception e) {
-            //No op.  proceed to next test
-        }
+  @After
+  public void removeLocalMBeans() {
+    try {
+      ComponentUtils.removeAllMBeansOfDomain("org.apache.streams.local");
+    } catch (Exception e) {
+      //No op.  proceed to next test
     }
+  }
 
-    @Test
-    public void testExpectedDatumsPersistWriterFileName() {
-        testDatums(new ExpectedDatumsPersistWriter(new StringToDocumentConverter(), "/TestFile.txt"));
-    }
+  @Test
+  public void testExpectedDatumsPersistWriterFileName() {
+    testDatums(new ExpectedDatumsPersistWriter(new StringToDocumentConverter(), "/TestFile.txt"));
+  }
 
 
 
-    private void testDatums(ExpectedDatumsPersistWriter writer) {
-        writer.prepare(null);
-        for(StreamsDatum datum : INPUT_DATUMS) {
-            writer.write(datum);
-        }
-        writer.cleanUp();
+  private void testDatums(ExpectedDatumsPersistWriter writer) {
+    writer.prepare(null);
+    for(StreamsDatum datum : INPUT_DATUMS) {
+      writer.write(datum);
     }
+    writer.cleanUp();
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/tests/TestFileReaderProvider.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/tests/TestFileReaderProvider.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/tests/TestFileReaderProvider.java
index 1ae9a24..a2b7bba 100644
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/tests/TestFileReaderProvider.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/tests/TestFileReaderProvider.java
@@ -23,41 +23,39 @@ import org.apache.streams.core.StreamsResultSet;
 import org.apache.streams.test.component.FileReaderProvider;
 import org.apache.streams.test.component.StringToDocumentConverter;
 import org.apache.streams.util.ComponentUtils;
+
 import org.junit.After;
-import org.junit.Ignore;
 import org.junit.Test;
 
-import java.io.InputStream;
-
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
 
 /**
  *
  */
 public class TestFileReaderProvider {
 
-    @After
-    public void removeLocalMBeans() {
-        try {
-            ComponentUtils.removeAllMBeansOfDomain("org.apache.streams.local");
-        } catch (Exception e) {
-            //No op.  proceed to next test
-        }
+  @After
+  public void removeLocalMBeans() {
+    try {
+      ComponentUtils.removeAllMBeansOfDomain("org.apache.streams.local");
+    } catch (Exception e) {
+      //No op.  proceed to next test
     }
+  }
 
-    @Test
-    public void testFileReaderProviderFileName() {
-        String fileName = "/TestFile.txt";
-        FileReaderProvider provider = new FileReaderProvider(fileName, new StringToDocumentConverter());
-        provider.prepare(null);
-        StreamsResultSet resultSet = provider.readCurrent();
-        int count = 0;
-        for(StreamsDatum datum : resultSet) {
-            ++count;
-        }
-        assertEquals(4, count);
-        provider.cleanUp();
+  @Test
+  public void testFileReaderProviderFileName() {
+    String fileName = "/TestFile.txt";
+    FileReaderProvider provider = new FileReaderProvider(fileName, new StringToDocumentConverter());
+    provider.prepare(null);
+    StreamsResultSet resultSet = provider.readCurrent();
+    int count = 0;
+    for(StreamsDatum datum : resultSet) {
+      ++count;
     }
+    assertEquals(4, count);
+    provider.cleanUp();
+  }
 
 
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsComponentFactory.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsComponentFactory.java b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsComponentFactory.java
index 9b887af..44ade9c 100644
--- a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsComponentFactory.java
+++ b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsComponentFactory.java
@@ -19,54 +19,47 @@
 
 package org.apache.streams.pig;
 
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import org.apache.commons.lang.ArrayUtils;
 import org.apache.streams.core.StreamsProcessor;
 import org.apache.streams.data.ActivityConverter;
-import org.apache.streams.data.ActivitySerializer;
-import org.slf4j.Logger;
 
-import java.lang.reflect.InvocationTargetException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
 
 /**
- * Static reflection wrappers for instantiating StreamsComponents
+ * Static reflection wrappers for instantiating StreamsComponents.
  */
 public class StreamsComponentFactory {
 
-    private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(StreamsComponentFactory.class);
+  private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(StreamsComponentFactory.class);
 
-    public static ActivityConverter getConverterInstance(Class<?> converterClazz) {
+  public static ActivityConverter getConverterInstance(Class<?> converterClazz) {
 
-        Object object = null;
-        try {
-            object = converterClazz.getConstructor().newInstance();
-        } catch (Exception e) {
-            LOGGER.error(e.getMessage());
-        }
-
-        Preconditions.checkNotNull(object);
+    Object object = null;
+    try {
+      object = converterClazz.getConstructor().newInstance();
+    } catch (Exception e) {
+      LOGGER.error(e.getMessage());
+    }
 
-        ActivityConverter converter = (ActivityConverter) object;
+    Preconditions.checkNotNull(object);
 
-        return converter;
+    ActivityConverter converter = (ActivityConverter) object;
 
-    }
+    return converter;
 
-    public static StreamsProcessor getProcessorInstance(Class<?> processorClazz) {
+  }
 
-        Object object = null;
-        try {
-            object = processorClazz.getConstructor().newInstance();
-        } catch (Exception e) {
-            LOGGER.error(e.getMessage());
-        }
-        StreamsProcessor processor = (StreamsProcessor) object;
-        return processor;
+  public static StreamsProcessor getProcessorInstance(Class<?> processorClazz) {
 
+    Object object = null;
+    try {
+      object = processorClazz.getConstructor().newInstance();
+    } catch (Exception e) {
+      LOGGER.error(e.getMessage());
     }
+    StreamsProcessor processor = (StreamsProcessor) object;
+    return processor;
+
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsPigBuilder.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsPigBuilder.java b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsPigBuilder.java
deleted file mode 100644
index 5ff4145..0000000
--- a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsPigBuilder.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.pig;
-
-import org.apache.streams.config.StreamsConfiguration;
-import org.apache.streams.core.StreamBuilder;
-import org.apache.streams.core.StreamsPersistWriter;
-import org.apache.streams.core.StreamsProcessor;
-import org.apache.streams.core.StreamsProvider;
-import org.joda.time.DateTime;
-
-import java.math.BigInteger;
-
-/**
- * Goal is to be able to build a pig workflow using same syntax as other
- * StreamsBuilders
- *
- * Currently implementers must write own pig scripts to use this module
- */
-public class StreamsPigBuilder implements StreamBuilder {
-
-    @Override
-    public StreamBuilder setStreamsConfiguration(StreamsConfiguration configuration) {
-        return null;
-    }
-
-    @Override
-    public StreamsConfiguration getStreamsConfiguration() {
-        return null;
-    }
-
-    @Override
-    public StreamBuilder addStreamsProcessor(String s, StreamsProcessor streamsProcessor, int i, String... strings) {
-        return null;
-    }
-
-    @Override
-    public StreamBuilder addStreamsPersistWriter(String s, StreamsPersistWriter streamsPersistWriter, int i, String... strings) {
-        return null;
-    }
-
-    @Override
-    public StreamBuilder newPerpetualStream(String s, StreamsProvider streamsProvider) {
-        return null;
-    }
-
-    @Override
-    public StreamBuilder newReadCurrentStream(String s, StreamsProvider streamsProvider) {
-        return null;
-    }
-
-    @Override
-    public StreamBuilder newReadNewStream(String s, StreamsProvider streamsProvider, BigInteger bigInteger) {
-        return null;
-    }
-
-    @Override
-    public StreamBuilder newReadRangeStream(String s, StreamsProvider streamsProvider, DateTime dateTime, DateTime dateTime2) {
-        return null;
-    }
-
-    @Override
-    public void start() {
-
-    }
-
-    @Override
-    public void stop() {
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDatumExec.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDatumExec.java b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDatumExec.java
index 74f7eb5..cd08020 100644
--- a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDatumExec.java
+++ b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDatumExec.java
@@ -19,21 +19,25 @@
 
 package org.apache.streams.pig;
 
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.data.util.RFC3339Utils;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import datafu.pig.util.AliasableEvalFunc;
 import org.apache.commons.lang.ArrayUtils;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.pig.EvalFunc;
 import org.apache.pig.builtin.MonitoredUDF;
-import org.apache.pig.data.*;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.util.UDFContext;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProcessor;
-import org.apache.streams.data.util.RFC3339Utils;
-import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 
@@ -48,40 +52,40 @@ import java.util.concurrent.TimeUnit;
 @MonitoredUDF(timeUnit = TimeUnit.SECONDS, duration = 30, intDefault = 10)
 public class StreamsProcessDatumExec extends AliasableEvalFunc<DataBag> {
 
-    private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(StreamsProcessDatumExec.class);
-
-    TupleFactory mTupleFactory = TupleFactory.getInstance();
-    BagFactory mBagFactory = BagFactory.getInstance();
-
-    StreamsProcessor streamsProcessor;
-
-    ObjectMapper mapper = StreamsJacksonMapper.getInstance();
-
-    public StreamsProcessDatumExec(String... execArgs) throws ClassNotFoundException{
-        Preconditions.checkNotNull(execArgs);
-        Preconditions.checkArgument(execArgs.length > 0);
-        String classFullName = execArgs[0];
-        Preconditions.checkNotNull(classFullName);
-        String[] prepareArgs = (String[]) ArrayUtils.remove(execArgs, 0);
-        streamsProcessor = StreamsComponentFactory.getProcessorInstance(Class.forName(classFullName));
-        if( execArgs.length == 1 ) {
-            LOGGER.debug("prepare (null)");
-            streamsProcessor.prepare(null);
-        } else if( execArgs.length > 1 ) {
-            LOGGER.debug("prepare " + Arrays.toString(prepareArgs));
-            streamsProcessor.prepare(prepareArgs);
-        }
+  private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(StreamsProcessDatumExec.class);
+
+  TupleFactory mTupleFactory = TupleFactory.getInstance();
+  BagFactory mBagFactory = BagFactory.getInstance();
+
+  StreamsProcessor streamsProcessor;
+
+  ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+  public StreamsProcessDatumExec(String... execArgs) throws ClassNotFoundException{
+    Preconditions.checkNotNull(execArgs);
+    Preconditions.checkArgument(execArgs.length > 0);
+    String classFullName = execArgs[0];
+    Preconditions.checkNotNull(classFullName);
+    String[] prepareArgs = (String[]) ArrayUtils.remove(execArgs, 0);
+    streamsProcessor = StreamsComponentFactory.getProcessorInstance(Class.forName(classFullName));
+    if( execArgs.length == 1 ) {
+      LOGGER.debug("prepare (null)");
+      streamsProcessor.prepare(null);
+    } else if( execArgs.length > 1 ) {
+      LOGGER.debug("prepare " + Arrays.toString(prepareArgs));
+      streamsProcessor.prepare(prepareArgs);
     }
+  }
 
-    @Override
-    public DataBag exec(Tuple input) throws IOException {
+  @Override
+  public DataBag exec(Tuple input) throws IOException {
 
-        if (input == null || input.size() == 0)
-            return null;
+    if (input == null || input.size() == 0)
+      return null;
 
-        DataBag output = BagFactory.getInstance().newDefaultBag();
+    DataBag output = BagFactory.getInstance().newDefaultBag();
 
-        Configuration conf = UDFContext.getUDFContext().getJobConf();
+    Configuration conf = UDFContext.getUDFContext().getJobConf();
 
 //      I would prefer it work this way, but at the moment it doesn't
 
@@ -95,91 +99,91 @@ public class StreamsProcessDatumExec extends AliasableEvalFunc<DataBag> {
 //        }
 //        String object = getString(input, "object");
 
-        String id = (String) input.get(0);
-        String source = (String) input.get(1);
-        Long timestamp;
-        try {
-            timestamp = (Long) input.get(2);
-        } catch( Exception e ) {
-            timestamp = RFC3339Utils.parseUTC((String)input.get(2)).getMillis();
-        }
-        String object = (String) input.get(3);
+    String id = (String) input.get(0);
+    String source = (String) input.get(1);
+    Long timestamp;
+    try {
+      timestamp = (Long) input.get(2);
+    } catch( Exception e ) {
+      timestamp = RFC3339Utils.parseUTC((String)input.get(2)).getMillis();
+    }
+    String object = (String) input.get(3);
 
-        StreamsDatum entry = new StreamsDatum(object, id, new DateTime(timestamp));
+    StreamsDatum entry = new StreamsDatum(object, id, new DateTime(timestamp));
 
-        List<StreamsDatum> resultSet = streamsProcessor.process(entry);
-        List<Tuple> resultTupleList = Lists.newArrayList();
+    List<StreamsDatum> resultSet = streamsProcessor.process(entry);
+    List<Tuple> resultTupleList = Lists.newArrayList();
 
-        for( StreamsDatum resultDatum : resultSet ) {
-            Tuple tuple = mTupleFactory.newTuple();
-            tuple.append(id);
-            tuple.append(source);
-            tuple.append(timestamp);
+    for( StreamsDatum resultDatum : resultSet ) {
+      Tuple tuple = mTupleFactory.newTuple();
+      tuple.append(id);
+      tuple.append(source);
+      tuple.append(timestamp);
 
-            if( resultDatum.getDocument() instanceof String )
-                tuple.append(resultDatum.getDocument());
-            else
-                tuple.append(mapper.writeValueAsString(resultDatum.getDocument()));
-            resultTupleList.add(tuple);
-        }
+      if( resultDatum.getDocument() instanceof String )
+        tuple.append(resultDatum.getDocument());
+      else
+        tuple.append(mapper.writeValueAsString(resultDatum.getDocument()));
+      resultTupleList.add(tuple);
+    }
 
-        DataBag result = mBagFactory.newDefaultBag(resultTupleList);
+    DataBag result = mBagFactory.newDefaultBag(resultTupleList);
 
-        return result;
+    return result;
 
-    }
+  }
 
-    public void finish() {
-        streamsProcessor.cleanUp();
+  public void finish() {
+    streamsProcessor.cleanUp();
+  }
+
+  @Override
+  public Schema getOutputSchema(Schema schema) {
+    // Check that we were passed two fields
+    String error = "Expected: id\tsource\ttimestamp\tobject";
+    if (schema.size() != 4) {
+      throw new RuntimeException(error);
     }
 
-    @Override
-    public Schema getOutputSchema(Schema schema) {
-        // Check that we were passed two fields
-        String error = "Expected: id\tsource\ttimestamp\tobject";
-        if (schema.size() != 4) {
-            throw new RuntimeException(error);
-        }
-
-        try {
-            // Get the types for both columns and check them.  If they are
-            // wrong, figure out what types were passed and give a good error
-            // message.
-            if (schema.getField(0).type != DataType.CHARARRAY &&
-                    schema.getField(0).type != DataType.LONG) {
-                error += "Problem with id: must be CHARARRAY or LONG";
-                error += "\t(";
-                error += DataType.findTypeName(schema.getField(0).type);
-                error += ")\n";
-                throw new RuntimeException(error);
-            }
-            if (schema.getField(1).type != DataType.CHARARRAY) {
-                error += "Problem with source: must be CHARARRAY";
-                error += "\t(";
-                error += DataType.findTypeName(schema.getField(1).type);
-                error += ")\n";
-                throw new RuntimeException(error);
-            }
-            if (schema.getField(2).type != DataType.CHARARRAY &&
-                    schema.getField(2).type != DataType.LONG) {
-                error += "Problem with timestamp: must be CHARARRAY or LONG";
-                error += "\t(";
-                error += DataType.findTypeName(schema.getField(2).type);
-                error += ")\n";
-                throw new RuntimeException(error);
-            }
-            if (schema.getField(3).type != DataType.CHARARRAY) {
-                error += "Problem with object: must be CHARARRAY";
-                error += "\t(";
-                error += DataType.findTypeName(schema.getField(3).type);
-                error += ")\n";
-                throw new RuntimeException(error);
-            }
-        } catch (Exception e) {
-            throw new RuntimeException(error);
-        }
-
-        // Always hand back the same schema we are passed
-        return schema;
+    try {
+      // Get the types for both columns and check them.  If they are
+      // wrong, figure out what types were passed and give a good error
+      // message.
+      if (schema.getField(0).type != DataType.CHARARRAY &&
+          schema.getField(0).type != DataType.LONG) {
+        error += "Problem with id: must be CHARARRAY or LONG";
+        error += "\t(";
+        error += DataType.findTypeName(schema.getField(0).type);
+        error += ")\n";
+        throw new RuntimeException(error);
+      }
+      if (schema.getField(1).type != DataType.CHARARRAY) {
+        error += "Problem with source: must be CHARARRAY";
+        error += "\t(";
+        error += DataType.findTypeName(schema.getField(1).type);
+        error += ")\n";
+        throw new RuntimeException(error);
+      }
+      if (schema.getField(2).type != DataType.CHARARRAY &&
+          schema.getField(2).type != DataType.LONG) {
+        error += "Problem with timestamp: must be CHARARRAY or LONG";
+        error += "\t(";
+        error += DataType.findTypeName(schema.getField(2).type);
+        error += ")\n";
+        throw new RuntimeException(error);
+      }
+      if (schema.getField(3).type != DataType.CHARARRAY) {
+        error += "Problem with object: must be CHARARRAY";
+        error += "\t(";
+        error += DataType.findTypeName(schema.getField(3).type);
+        error += ")\n";
+        throw new RuntimeException(error);
+      }
+    } catch (Exception e) {
+      throw new RuntimeException(error);
     }
+
+    // Always hand back the same schema we are passed
+    return schema;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDocumentExec.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDocumentExec.java b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDocumentExec.java
index 788b347..2f40923 100644
--- a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDocumentExec.java
+++ b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDocumentExec.java
@@ -19,25 +19,15 @@
 
 package org.apache.streams.pig;
 
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
 import datafu.pig.util.SimpleEvalFunc;
 import org.apache.commons.lang.ArrayUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.pig.EvalFunc;
 import org.apache.pig.builtin.MonitoredUDF;
-import org.apache.pig.data.BagFactory;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-import org.apache.pig.impl.util.UDFContext;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProcessor;
-import org.apache.streams.data.ActivitySerializer;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.pojo.json.Activity;
 import org.slf4j.Logger;
 
 import java.io.IOException;
@@ -54,59 +44,59 @@ import java.util.concurrent.TimeUnit;
 @MonitoredUDF(timeUnit = TimeUnit.SECONDS, duration = 30, intDefault = 10)
 public class StreamsProcessDocumentExec extends SimpleEvalFunc<String> {
 
-    private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(StreamsProcessDocumentExec.class);
-
-    StreamsProcessor streamsProcessor;
-    ObjectMapper mapper = StreamsJacksonMapper.getInstance();
-
-    public StreamsProcessDocumentExec(String... execArgs) throws ClassNotFoundException{
-        Preconditions.checkNotNull(execArgs);
-        Preconditions.checkArgument(execArgs.length > 0);
-        String classFullName = execArgs[0];
-        Preconditions.checkNotNull(classFullName);
-        String[] prepareArgs = (String[]) ArrayUtils.remove(execArgs, 0);
-        streamsProcessor = StreamsComponentFactory.getProcessorInstance(Class.forName(classFullName));
-        if( execArgs.length == 1 ) {
-            LOGGER.debug("prepare (null)");
-            streamsProcessor.prepare(null);
-        } else if( execArgs.length > 1 ) {
-            LOGGER.debug("prepare " + Arrays.toString(prepareArgs));
-            streamsProcessor.prepare(prepareArgs);
-        }
+  private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(StreamsProcessDocumentExec.class);
+
+  StreamsProcessor streamsProcessor;
+  ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+  public StreamsProcessDocumentExec(String... execArgs) throws ClassNotFoundException{
+    Preconditions.checkNotNull(execArgs);
+    Preconditions.checkArgument(execArgs.length > 0);
+    String classFullName = execArgs[0];
+    Preconditions.checkNotNull(classFullName);
+    String[] prepareArgs = (String[]) ArrayUtils.remove(execArgs, 0);
+    streamsProcessor = StreamsComponentFactory.getProcessorInstance(Class.forName(classFullName));
+    if( execArgs.length == 1 ) {
+      LOGGER.debug("prepare (null)");
+      streamsProcessor.prepare(null);
+    } else if( execArgs.length > 1 ) {
+      LOGGER.debug("prepare " + Arrays.toString(prepareArgs));
+      streamsProcessor.prepare(prepareArgs);
     }
+  }
 
-    public String call(String document) throws IOException {
+  public String call(String document) throws IOException {
 
-        Preconditions.checkNotNull(streamsProcessor);
-        Preconditions.checkNotNull(document);
+    Preconditions.checkNotNull(streamsProcessor);
+    Preconditions.checkNotNull(document);
 
-        LOGGER.debug(document);
+    LOGGER.debug(document);
 
-        StreamsDatum entry = new StreamsDatum(document);
+    StreamsDatum entry = new StreamsDatum(document);
 
-        Preconditions.checkNotNull(entry);
+    Preconditions.checkNotNull(entry);
 
-        LOGGER.debug(entry.toString());
+    LOGGER.debug(entry.toString());
 
-        List<StreamsDatum> resultSet = streamsProcessor.process(entry);
+    List<StreamsDatum> resultSet = streamsProcessor.process(entry);
 
-        LOGGER.debug(resultSet.toString());
+    LOGGER.debug(resultSet.toString());
 
-        Object resultDoc = null;
-        for( StreamsDatum resultDatum : resultSet ) {
-            resultDoc = resultDatum.getDocument();
-        }
+    Object resultDoc = null;
+    for( StreamsDatum resultDatum : resultSet ) {
+      resultDoc = resultDatum.getDocument();
+    }
 
-        Preconditions.checkNotNull(resultDoc);
+    Preconditions.checkNotNull(resultDoc);
 
-        if( resultDoc instanceof String )
-            return (String) resultDoc;
-        else
-            return mapper.writeValueAsString(resultDoc);
+    if( resultDoc instanceof String )
+      return (String) resultDoc;
+    else
+      return mapper.writeValueAsString(resultDoc);
 
-    }
+  }
 
-    public void finish() {
-        streamsProcessor.cleanUp();
-    }
+  public void finish() {
+    streamsProcessor.cleanUp();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsStorage.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsStorage.java b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsStorage.java
deleted file mode 100644
index 7692763..0000000
--- a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsStorage.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.pig;
-
-import org.apache.pig.builtin.PigStorage;
-
-/**
- * It would be nice if streams persisters could be used for input / output
- * within the pig runtime.
- */
-public class StreamsStorage extends PigStorage {
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/AppendStringProcessor.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/AppendStringProcessor.java b/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/AppendStringProcessor.java
index 4db38fd..a48a5e8 100644
--- a/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/AppendStringProcessor.java
+++ b/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/AppendStringProcessor.java
@@ -21,46 +21,47 @@ package org.apache.streams.pig.test;
 
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProcessor;
+
 import org.slf4j.Logger;
 
 import java.util.LinkedList;
 import java.util.List;
 
 /**
- * Used to Test Pig processor wrapper with arguments to prepare method
+ * Used to Test Pig processor wrapper with arguments to prepare method.
  */
 public class AppendStringProcessor implements StreamsProcessor {
 
-    public final static String STREAMS_ID = "AppendStringProcessor";
+  public final static String STREAMS_ID = "AppendStringProcessor";
 
-    private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(AppendStringProcessor.class);
+  private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(AppendStringProcessor.class);
 
-    String append;
+  String append;
 
-    public AppendStringProcessor() {
-    }
+  public AppendStringProcessor() {
+  }
 
-    @Override
-    public String getId() {
-        return STREAMS_ID;
-    }
+  @Override
+  public String getId() {
+    return STREAMS_ID;
+  }
 
-    @Override
-    public List<StreamsDatum> process(StreamsDatum entry) {
-        List<StreamsDatum> resultSet;
-        resultSet = new LinkedList<StreamsDatum>();
-        String value = (String) entry.getDocument()+ new String(append);
-        resultSet.add(new StreamsDatum(value));
-        return resultSet;
-    }
+  @Override
+  public List<StreamsDatum> process(StreamsDatum entry) {
+    List<StreamsDatum> resultSet;
+    resultSet = new LinkedList<StreamsDatum>();
+    String value = (String) entry.getDocument()+ new String(append);
+    resultSet.add(new StreamsDatum(value));
+    return resultSet;
+  }
 
-    @Override
-    public void prepare(Object configurationObject) {
-        append = ((String[]) configurationObject)[0];
-    }
+  @Override
+  public void prepare(Object configurationObject) {
+    append = ((String[]) configurationObject)[0];
+  }
 
-    @Override
-    public void cleanUp() {
-        LOGGER.info("Processor clean up");
-    }
+  @Override
+  public void cleanUp() {
+    LOGGER.info("Processor clean up");
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/CopyThriceProcessor.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/CopyThriceProcessor.java b/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/CopyThriceProcessor.java
index 2b687b1..5336007 100644
--- a/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/CopyThriceProcessor.java
+++ b/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/CopyThriceProcessor.java
@@ -21,46 +21,47 @@ package org.apache.streams.pig.test;
 
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProcessor;
+
 import org.slf4j.Logger;
 
 import java.util.LinkedList;
 import java.util.List;
 
 /**
- * Used to Test Pig processor wrapper when multiple datums are returned
+ * Used to Test Pig processor wrapper when multiple datums are returned.
  */
 public class CopyThriceProcessor implements StreamsProcessor {
 
-    public final static String STREAMS_ID = "CopyThriceProcessor";
+  public final static String STREAMS_ID = "CopyThriceProcessor";
 
-    private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(CopyThriceProcessor.class);
+  private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(CopyThriceProcessor.class);
 
-    List<StreamsDatum> result;
+  List<StreamsDatum> result;
 
-    public CopyThriceProcessor() {
-    }
+  public CopyThriceProcessor() {
+  }
 
-    @Override
-    public String getId() {
-        return STREAMS_ID;
-    }
+  @Override
+  public String getId() {
+    return STREAMS_ID;
+  }
 
-    @Override
-    public List<StreamsDatum> process(StreamsDatum entry) {
-        this.result = new LinkedList<StreamsDatum>();
-        result.add(entry);
-        result.add(entry);
-        result.add(entry);
-        return result;
-    }
+  @Override
+  public List<StreamsDatum> process(StreamsDatum entry) {
+    this.result = new LinkedList<StreamsDatum>();
+    result.add(entry);
+    result.add(entry);
+    result.add(entry);
+    return result;
+  }
 
-    @Override
-    public void prepare(Object configurationObject) {
+  @Override
+  public void prepare(Object configurationObject) {
 
-    }
+  }
 
-    @Override
-    public void cleanUp() {
-        LOGGER.info("Processor clean up");
-    }
+  @Override
+  public void cleanUp() {
+    LOGGER.info("Processor clean up");
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/DoNothingProcessor.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/DoNothingProcessor.java b/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/DoNothingProcessor.java
index 5528a38..07d3b6f 100644
--- a/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/DoNothingProcessor.java
+++ b/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/DoNothingProcessor.java
@@ -21,44 +21,45 @@ package org.apache.streams.pig.test;
 
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProcessor;
+
 import org.slf4j.Logger;
 
 import java.util.LinkedList;
 import java.util.List;
 
 /**
- * Used to Test Pig processor wrapper - datum passthrough
+ * Used to Test Pig processor wrapper - datum passthrough.
  */
 public class DoNothingProcessor implements StreamsProcessor {
 
-    public final static String STREAMS_ID = "DoNothingProcessor";
+  public final static String STREAMS_ID = "DoNothingProcessor";
 
-    private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(DoNothingProcessor.class);
+  private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(DoNothingProcessor.class);
 
-    List<StreamsDatum> result;
+  List<StreamsDatum> result;
 
-    public DoNothingProcessor() {
-    }
+  public DoNothingProcessor() {
+  }
 
-    @Override
-    public String getId() {
-        return STREAMS_ID;
-    }
+  @Override
+  public String getId() {
+    return STREAMS_ID;
+  }
 
-    @Override
-    public List<StreamsDatum> process(StreamsDatum entry) {
-        this.result = new LinkedList<StreamsDatum>();
-        result.add(entry);
-        return result;
-    }
+  @Override
+  public List<StreamsDatum> process(StreamsDatum entry) {
+    this.result = new LinkedList<StreamsDatum>();
+    result.add(entry);
+    return result;
+  }
 
-    @Override
-    public void prepare(Object configurationObject) {
-        LOGGER.info("Processor prepare");
-    }
+  @Override
+  public void prepare(Object configurationObject) {
+    LOGGER.info("Processor prepare");
+  }
 
-    @Override
-    public void cleanUp() {
-        LOGGER.info("Processor clean up");
-    }
+  @Override
+  public void cleanUp() {
+    LOGGER.info("Processor clean up");
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/PigConverterTest.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/PigConverterTest.java b/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/PigConverterTest.java
index 5dad52c..a983cc7 100644
--- a/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/PigConverterTest.java
+++ b/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/PigConverterTest.java
@@ -19,13 +19,14 @@
 
 package org.apache.streams.pig.test;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.Lists;
-import org.apache.pig.pigunit.PigTest;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.twitter.converter.TwitterDateTimeFormat;
 import org.apache.streams.twitter.converter.TwitterJsonRetweetActivityConverter;
 import org.apache.streams.twitter.pojo.Retweet;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
+import org.apache.pig.pigunit.PigTest;
 import org.apache.tools.ant.util.StringUtils;
 import org.junit.Test;
 
@@ -34,23 +35,23 @@ import org.junit.Test;
  */
 public class PigConverterTest {
 
-    @Test
-    public void testPigConverter() throws Exception {
+  @Test
+  public void testPigConverter() throws Exception {
 
-        String[] input = {
-                "159475541894897679\ttwitter,statuses/user_timeline\t1384499359006\t{\"retweeted_status\":{\"contributors\":null,\"text\":\"The Costa Concordia cruise ship accident could be a disaster for the industry | http://t.co/M9UUNvZi (via @TIMEMoneyland)\",\"geo\":null,\"retweeted\":false,\"in_reply_to_screen_name\":null,\"possibly_sensitive\":false,\"truncated\":false,\"lang\":\"en\",\"entities\":{\"symbols\":[],\"urls\":[{\"expanded_url\":\"http://ti.me/zYyEtD\",\"indices\":[80,100],\"display_url\":\"ti.me/zYyEtD\",\"url\":\"http://t.co/M9UUNvZi\"}],\"hashtags\":[],\"user_mentions\":[{\"id\":245888431,\"name\":\"TIME Moneyland\",\"indices\":[106,120],\"screen_name\":\"TIMEMoneyland\",\"id_str\":\"245888431\"}]},\"in_reply_to_status_id_str\":null,\"id\":159470076259602432,\"source\":\"<a href=\\\"http://www.hootsuite.com\\\" rel=\\\"nofollow\\\">HootSuite<\\/a>\",\"in_reply_to_user_id_str\":null,\"favorited\":false,\"in_reply_to_status_id\":null,\"retweet_count\":71,\"create
 d_at\":\"Wed Jan 18 03:00:03 +0000 2012\",\"in_reply_to_user_id\":null,\"favorite_count\":14,\"id_str\":\"159470076259602432\",\"place\":null,\"user\":{\"location\":\"\",\"default_profile\":false,\"profile_background_tile\":true,\"statuses_count\":70754,\"lang\":\"en\",\"profile_link_color\":\"1B4F89\",\"profile_banner_url\":\"https://pbs.twimg.com/profile_banners/14293310/1355243462\",\"id\":14293310,\"following\":false,\"protected\":false,\"favourites_count\":59,\"profile_text_color\":\"000000\",\"description\":\"Breaking news and current events from around the globe. Hosted by TIME staff. Tweet questions to our customer service team @TIMEmag_Service.\",\"verified\":true,\"contributors_enabled\":false,\"profile_sidebar_border_color\":\"000000\",\"name\":\"TIME.com\",\"profile_background_color\":\"CC0000\",\"created_at\":\"Thu Apr 03 13:54:30 +0000 2008\",\"default_profile_image\":false,\"followers_count\":5146268,\"profile_image_url_https\":\"https://pbs.twimg.com/profile_images/1
 700796190/Picture_24_normal.png\",\"geo_enabled\":false,\"profile_background_image_url\":\"http://a0.twimg.com/profile_background_images/735228291/107f1a300a90ee713937234bb3d139c0.jpeg\",\"profile_background_image_url_https\":\"https://si0.twimg.com/profile_background_images/735228291/107f1a300a90ee713937234bb3d139c0.jpeg\",\"follow_request_sent\":false,\"entities\":{\"description\":{\"urls\":[]},\"url\":{\"urls\":[{\"expanded_url\":\"http://www.time.com\",\"indices\":[0,22],\"display_url\":\"time.com\",\"url\":\"http://t.co/4aYbUuAeSh\"}]}},\"url\":\"http://t.co/4aYbUuAeSh\",\"utc_offset\":-18000,\"time_zone\":\"Eastern Time (US & Canada)\",\"notifications\":false,\"profile_use_background_image\":true,\"friends_count\":742,\"profile_sidebar_fill_color\":\"D9D9D9\",\"screen_name\":\"TIME\",\"id_str\":\"14293310\",\"profile_image_url\":\"http://pbs.twimg.com/profile_images/1700796190/Picture_24_normal.png\",\"listed_count\":76944,\"is_translator\":false},\"coordinates\":null},\"contr
 ibutors\":null,\"text\":\"RT @TIME: The Costa Concordia cruise ship accident could be a disaster for the industry | http://t.co/M9UUNvZi (via @TIMEMoneyland)\",\"geo\":null,\"retweeted\":false,\"in_reply_to_screen_name\":null,\"possibly_sensitive\":false,\"truncated\":false,\"lang\":\"en\",\"entities\":{\"symbols\":[],\"urls\":[{\"expanded_url\":\"http://ti.me/zYyEtD\",\"indices\":[90,110],\"display_url\":\"ti.me/zYyEtD\",\"url\":\"http://t.co/M9UUNvZi\"}],\"hashtags\":[],\"user_mentions\":[{\"id\":14293310,\"name\":\"TIME.com\",\"indices\":[3,8],\"screen_name\":\"TIME\",\"id_str\":\"14293310\"},{\"id\":245888431,\"name\":\"TIME Moneyland\",\"indices\":[116,130],\"screen_name\":\"TIMEMoneyland\",\"id_str\":\"245888431\"}]},\"in_reply_to_status_id_str\":null,\"id\":159475541894897679,\"source\":\"<a href=\\\"http://twitter.com/download/iphone\\\" rel=\\\"nofollow\\\">Twitter for iPhone<\\/a>\",\"in_reply_to_user_id_str\":null,\"favorited\":false,\"in_reply_to_status_id\":null,\"retwe
 et_count\":71,\"created_at\":\"Wed Jan 18 03:21:46 +0000 2012\",\"in_reply_to_user_id\":null,\"favorite_count\":0,\"id_str\":\"159475541894897679\",\"place\":null,\"user\":{\"location\":\"\",\"default_profile\":false,\"profile_background_tile\":true,\"statuses_count\":5053,\"lang\":\"en\",\"profile_link_color\":\"738D84\",\"id\":27552112,\"following\":false,\"protected\":false,\"favourites_count\":52,\"profile_text_color\":\"97CEC9\",\"description\":\"\",\"verified\":false,\"contributors_enabled\":false,\"profile_sidebar_border_color\":\"A9AC00\",\"name\":\"rafael medina-flores\",\"profile_background_color\":\"C5EFE3\",\"created_at\":\"Mon Mar 30 01:21:55 +0000 2009\",\"default_profile_image\":false,\"followers_count\":963,\"profile_image_url_https\":\"https://pbs.twimg.com/profile_images/2519547938/image_normal.jpg\",\"geo_enabled\":true,\"profile_background_image_url\":\"http://a0.twimg.com/profile_background_images/167479660/trireme.jpg\",\"profile_background_image_url_https\":\"
 https://si0.twimg.com/profile_background_images/167479660/trireme.jpg\",\"follow_request_sent\":false,\"entities\":{\"description\":{\"urls\":[]}},\"url\":null,\"utc_offset\":-25200,\"time_zone\":\"Mountain Time (US & Canada)\",\"notifications\":false,\"profile_use_background_image\":true,\"friends_count\":1800,\"profile_sidebar_fill_color\":\"5C4F3C\",\"screen_name\":\"rmedinaflores\",\"id_str\":\"27552112\",\"profile_image_url\":\"http://pbs.twimg.com/profile_images/2519547938/image_normal.jpg\",\"listed_count\":50,\"is_translator\":false},\"coordinates\":null}"
-        };
+    String[] input = {
+        "159475541894897679\ttwitter,statuses/user_timeline\t1384499359006\t{\"retweeted_status\":{\"contributors\":null,\"text\":\"The Costa Concordia cruise ship accident could be a disaster for the industry | http://t.co/M9UUNvZi (via @TIMEMoneyland)\",\"geo\":null,\"retweeted\":false,\"in_reply_to_screen_name\":null,\"possibly_sensitive\":false,\"truncated\":false,\"lang\":\"en\",\"entities\":{\"symbols\":[],\"urls\":[{\"expanded_url\":\"http://ti.me/zYyEtD\",\"indices\":[80,100],\"display_url\":\"ti.me/zYyEtD\",\"url\":\"http://t.co/M9UUNvZi\"}],\"hashtags\":[],\"user_mentions\":[{\"id\":245888431,\"name\":\"TIME Moneyland\",\"indices\":[106,120],\"screen_name\":\"TIMEMoneyland\",\"id_str\":\"245888431\"}]},\"in_reply_to_status_id_str\":null,\"id\":159470076259602432,\"source\":\"<a href=\\\"http://www.hootsuite.com\\\" rel=\\\"nofollow\\\">HootSuite<\\/a>\",\"in_reply_to_user_id_str\":null,\"favorited\":false,\"in_reply_to_status_id\":null,\"retweet_count\":71,\"created_at\":\
 "Wed Jan 18 03:00:03 +0000 2012\",\"in_reply_to_user_id\":null,\"favorite_count\":14,\"id_str\":\"159470076259602432\",\"place\":null,\"user\":{\"location\":\"\",\"default_profile\":false,\"profile_background_tile\":true,\"statuses_count\":70754,\"lang\":\"en\",\"profile_link_color\":\"1B4F89\",\"profile_banner_url\":\"https://pbs.twimg.com/profile_banners/14293310/1355243462\",\"id\":14293310,\"following\":false,\"protected\":false,\"favourites_count\":59,\"profile_text_color\":\"000000\",\"description\":\"Breaking news and current events from around the globe. Hosted by TIME staff. Tweet questions to our customer service team @TIMEmag_Service.\",\"verified\":true,\"contributors_enabled\":false,\"profile_sidebar_border_color\":\"000000\",\"name\":\"TIME.com\",\"profile_background_color\":\"CC0000\",\"created_at\":\"Thu Apr 03 13:54:30 +0000 2008\",\"default_profile_image\":false,\"followers_count\":5146268,\"profile_image_url_https\":\"https://pbs.twimg.com/profile_images/170079619
 0/Picture_24_normal.png\",\"geo_enabled\":false,\"profile_background_image_url\":\"http://a0.twimg.com/profile_background_images/735228291/107f1a300a90ee713937234bb3d139c0.jpeg\",\"profile_background_image_url_https\":\"https://si0.twimg.com/profile_background_images/735228291/107f1a300a90ee713937234bb3d139c0.jpeg\",\"follow_request_sent\":false,\"entities\":{\"description\":{\"urls\":[]},\"url\":{\"urls\":[{\"expanded_url\":\"http://www.time.com\",\"indices\":[0,22],\"display_url\":\"time.com\",\"url\":\"http://t.co/4aYbUuAeSh\"}]}},\"url\":\"http://t.co/4aYbUuAeSh\",\"utc_offset\":-18000,\"time_zone\":\"Eastern Time (US & Canada)\",\"notifications\":false,\"profile_use_background_image\":true,\"friends_count\":742,\"profile_sidebar_fill_color\":\"D9D9D9\",\"screen_name\":\"TIME\",\"id_str\":\"14293310\",\"profile_image_url\":\"http://pbs.twimg.com/profile_images/1700796190/Picture_24_normal.png\",\"listed_count\":76944,\"is_translator\":false},\"coordinates\":null},\"contributors\
 ":null,\"text\":\"RT @TIME: The Costa Concordia cruise ship accident could be a disaster for the industry | http://t.co/M9UUNvZi (via @TIMEMoneyland)\",\"geo\":null,\"retweeted\":false,\"in_reply_to_screen_name\":null,\"possibly_sensitive\":false,\"truncated\":false,\"lang\":\"en\",\"entities\":{\"symbols\":[],\"urls\":[{\"expanded_url\":\"http://ti.me/zYyEtD\",\"indices\":[90,110],\"display_url\":\"ti.me/zYyEtD\",\"url\":\"http://t.co/M9UUNvZi\"}],\"hashtags\":[],\"user_mentions\":[{\"id\":14293310,\"name\":\"TIME.com\",\"indices\":[3,8],\"screen_name\":\"TIME\",\"id_str\":\"14293310\"},{\"id\":245888431,\"name\":\"TIME Moneyland\",\"indices\":[116,130],\"screen_name\":\"TIMEMoneyland\",\"id_str\":\"245888431\"}]},\"in_reply_to_status_id_str\":null,\"id\":159475541894897679,\"source\":\"<a href=\\\"http://twitter.com/download/iphone\\\" rel=\\\"nofollow\\\">Twitter for iPhone<\\/a>\",\"in_reply_to_user_id_str\":null,\"favorited\":false,\"in_reply_to_status_id\":null,\"retweet_count
 \":71,\"created_at\":\"Wed Jan 18 03:21:46 +0000 2012\",\"in_reply_to_user_id\":null,\"favorite_count\":0,\"id_str\":\"159475541894897679\",\"place\":null,\"user\":{\"location\":\"\",\"default_profile\":false,\"profile_background_tile\":true,\"statuses_count\":5053,\"lang\":\"en\",\"profile_link_color\":\"738D84\",\"id\":27552112,\"following\":false,\"protected\":false,\"favourites_count\":52,\"profile_text_color\":\"97CEC9\",\"description\":\"\",\"verified\":false,\"contributors_enabled\":false,\"profile_sidebar_border_color\":\"A9AC00\",\"name\":\"rafael medina-flores\",\"profile_background_color\":\"C5EFE3\",\"created_at\":\"Mon Mar 30 01:21:55 +0000 2009\",\"default_profile_image\":false,\"followers_count\":963,\"profile_image_url_https\":\"https://pbs.twimg.com/profile_images/2519547938/image_normal.jpg\",\"geo_enabled\":true,\"profile_background_image_url\":\"http://a0.twimg.com/profile_background_images/167479660/trireme.jpg\",\"profile_background_image_url_https\":\"https://
 si0.twimg.com/profile_background_images/167479660/trireme.jpg\",\"follow_request_sent\":false,\"entities\":{\"description\":{\"urls\":[]}},\"url\":null,\"utc_offset\":-25200,\"time_zone\":\"Mountain Time (US & Canada)\",\"notifications\":false,\"profile_use_background_image\":true,\"friends_count\":1800,\"profile_sidebar_fill_color\":\"5C4F3C\",\"screen_name\":\"rmedinaflores\",\"id_str\":\"27552112\",\"profile_image_url\":\"http://pbs.twimg.com/profile_images/2519547938/image_normal.jpg\",\"listed_count\":50,\"is_translator\":false},\"coordinates\":null}"
+    };
 
-        String doc = (String) StringUtils.split(input[0], '\t').get(3);
-        ObjectMapper mapper = StreamsJacksonMapper.getInstance(Lists.newArrayList(new TwitterDateTimeFormat().getFormat()));
-        String outdoc = mapper.writeValueAsString(new TwitterJsonRetweetActivityConverter().toActivityList(mapper.readValue(doc, Retweet.class)).get(0));
+    String doc = (String) StringUtils.split(input[0], '\t').get(3);
+    ObjectMapper mapper = StreamsJacksonMapper.getInstance(Lists.newArrayList(new TwitterDateTimeFormat().getFormat()));
+    String outdoc = mapper.writeValueAsString(new TwitterJsonRetweetActivityConverter().toActivityList(mapper.readValue(doc, Retweet.class)).get(0));
 
-        String[] output = new String[1];
-        output[0] = "(159475541894897679,twitter,statuses/user_timeline,1384499359006," + outdoc + ")";
+    String[] output = new String[1];
+    output[0] = "(159475541894897679,twitter,statuses/user_timeline,1384499359006," + outdoc + ")";
 
-        PigTest test;
-        test = new PigTest("src/test/resources/pigconvertertest.pig");
-        test.assertOutput("in", input, "out", output);
+    PigTest test;
+    test = new PigTest("src/test/resources/pigconvertertest.pig");
+    test.assertOutput("in", input, "out", output);
 
-    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/PigProcessDatumTest.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/PigProcessDatumTest.java b/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/PigProcessDatumTest.java
index 80b17b4..1cb7252 100644
--- a/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/PigProcessDatumTest.java
+++ b/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/PigProcessDatumTest.java
@@ -19,74 +19,74 @@
 
 package org.apache.streams.pig.test;
 
-import org.apache.pig.pigunit.PigTest;
 import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.twitter.converter.TwitterJsonTweetActivityConverter;
+
+import org.apache.pig.pigunit.PigTest;
 import org.apache.tools.ant.util.StringUtils;
 import org.junit.Test;
 
 import java.util.List;
 
 /**
- * These are tests for StreamsProcessDatumExec
+ * These are tests for StreamsProcessDatumExec.
  */
 public class PigProcessDatumTest {
 
-    @Test
-    public void testPigDoNothingSingleDatum() throws Exception {
-        String[] args = {};
+  @Test
+  public void testPigDoNothingSingleDatum() throws Exception {
+    String[] args = {};
 
-        String[] input = {
-                "159475541894897679\ttwitter,statuses/user_timeline\t1384499359006\t{content:\"content\",[\"a\":1,\"b\":\"c\"}",
-        };
+    String[] input = {
+        "159475541894897679\ttwitter,statuses/user_timeline\t1384499359006\t{content:\"content\",[\"a\":1,\"b\":\"c\"}",
+    };
 
-        DoNothingProcessor processor = new DoNothingProcessor();
+    DoNothingProcessor processor = new DoNothingProcessor();
 
-        String doc = (String) StringUtils.split(input[0], '\t').get(3);
-        StreamsDatum inputDatum = new StreamsDatum(doc);
-        inputDatum.setId((String) StringUtils.split(input[0], '\t').get(0));
+    String doc = (String) StringUtils.split(input[0], '\t').get(3);
+    StreamsDatum inputDatum = new StreamsDatum(doc);
+    inputDatum.setId((String) StringUtils.split(input[0], '\t').get(0));
 
-        processor.prepare(null);
+    processor.prepare(null);
 
-        StreamsDatum resultDatum = processor.process(inputDatum).get(0);
-        String resultDocument = (String) resultDatum.getDocument();
+    StreamsDatum resultDatum = processor.process(inputDatum).get(0);
+    String resultDocument = (String) resultDatum.getDocument();
 
-        String[] output = new String[1];
-        output[0] = "(159475541894897679,twitter,statuses/user_timeline,1384499359006," + resultDocument + ")";
+    String[] output = new String[1];
+    output[0] = "(159475541894897679,twitter,statuses/user_timeline,1384499359006," + resultDocument + ")";
 
-        PigTest test;
-        test = new PigTest("src/test/resources/pigprocessdatumtest.pig", args);
-        test.assertOutput("in", input, "out", output);
+    PigTest test;
+    test = new PigTest("src/test/resources/pigprocessdatumtest.pig", args);
+    test.assertOutput("in", input, "out", output);
 
-    }
+  }
 
-    @Test
-    public void testPigCopyThriceSingleDatum() throws Exception {
-        String[] args = {};
+  @Test
+  public void testPigCopyThriceSingleDatum() throws Exception {
+    String[] args = {};
 
-        String[] input = {
-                "159475541894897679\ttwitter,statuses/user_timeline\t1384499359006\t{content:\"content\",[\"a\":1,\"b\":\"c\"}",
-        };
+    String[] input = {
+        "159475541894897679\ttwitter,statuses/user_timeline\t1384499359006\t{content:\"content\",[\"a\":1,\"b\":\"c\"}",
+    };
 
-        CopyThriceProcessor processor = new CopyThriceProcessor();
+    CopyThriceProcessor processor = new CopyThriceProcessor();
 
-        String doc = (String) StringUtils.split(input[0], '\t').get(3);
-        StreamsDatum inputDatum = new StreamsDatum(doc);
-        inputDatum.setId((String) StringUtils.split(input[0], '\t').get(0));
+    String doc = (String) StringUtils.split(input[0], '\t').get(3);
+    StreamsDatum inputDatum = new StreamsDatum(doc);
+    inputDatum.setId((String) StringUtils.split(input[0], '\t').get(0));
 
-        processor.prepare(null);
+    processor.prepare(null);
 
-        List<StreamsDatum> resultSet = processor.process(inputDatum);
+    List<StreamsDatum> resultSet = processor.process(inputDatum);
 
-        String[] output = new String[resultSet.size()];
+    String[] output = new String[resultSet.size()];
 
-        for( int i = 0; i < output.length; i++ ) {
-            output[i] = "(159475541894897679,twitter,statuses/user_timeline,1384499359006," + resultSet.get(i).getDocument() + ")";
-        }
+    for( int i = 0; i < output.length; i++ ) {
+      output[i] = "(159475541894897679,twitter,statuses/user_timeline,1384499359006," + resultSet.get(i).getDocument() + ")";
+    }
 
-        PigTest test;
-        test = new PigTest("src/test/resources/pigprocessdatumcopytest.pig", args);
-        test.assertOutput("in", input, "out", output);
+    PigTest test;
+    test = new PigTest("src/test/resources/pigprocessdatumcopytest.pig", args);
+    test.assertOutput("in", input, "out", output);
 
-    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/PigProcessDocumentTest.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/PigProcessDocumentTest.java b/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/PigProcessDocumentTest.java
index dd30eb1..2832fdc 100644
--- a/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/PigProcessDocumentTest.java
+++ b/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/PigProcessDocumentTest.java
@@ -19,94 +19,95 @@
 
 package org.apache.streams.pig.test;
 
-import org.apache.pig.pigunit.PigTest;
 import org.apache.streams.core.StreamsDatum;
+
+import org.apache.pig.pigunit.PigTest;
 import org.apache.tools.ant.util.StringUtils;
 import org.junit.Test;
 
 /**
- * These are tests for StreamsProcessDocumentExec
+ * These are tests for StreamsProcessDocumentExec.
  */
 public class PigProcessDocumentTest {
 
-    @Test
-    public void testPigProcessEmptyDocument() throws Exception {
+  @Test
+  public void testPigProcessEmptyDocument() throws Exception {
 
-        String[] input = {
-                "159475541894897679\ttwitter,statuses/user_timeline\t1384499359006\t{}"
-        };
+    String[] input = {
+        "159475541894897679\ttwitter,statuses/user_timeline\t1384499359006\t{}"
+    };
 
-        DoNothingProcessor processor = new DoNothingProcessor();
+    DoNothingProcessor processor = new DoNothingProcessor();
 
-        String doc = (String) StringUtils.split(input[0], '\t').get(3);
-        StreamsDatum inputDatum = new StreamsDatum(doc);
-        inputDatum.setId((String) StringUtils.split(input[0], '\t').get(0));
+    String doc = (String) StringUtils.split(input[0], '\t').get(3);
+    StreamsDatum inputDatum = new StreamsDatum(doc);
+    inputDatum.setId((String) StringUtils.split(input[0], '\t').get(0));
 
-        processor.prepare(null);
+    processor.prepare(null);
 
-        StreamsDatum resultDatum = processor.process(inputDatum).get(0);
-        String resultDocument = (String) resultDatum.getDocument();
+    StreamsDatum resultDatum = processor.process(inputDatum).get(0);
+    String resultDocument = (String) resultDatum.getDocument();
 
-        String[] output = new String[1];
-        output[0] = "(159475541894897679,twitter,statuses/user_timeline,1384499359006,"+resultDocument+")";
+    String[] output = new String[1];
+    output[0] = "(159475541894897679,twitter,statuses/user_timeline,1384499359006,"+resultDocument+")";
 
-        PigTest test;
-        test = new PigTest("src/test/resources/pigprocessdocumenttest.pig");
-        test.assertOutput("in", input, "out", output);
+    PigTest test;
+    test = new PigTest("src/test/resources/pigprocessdocumenttest.pig");
+    test.assertOutput("in", input, "out", output);
 
-    }
+  }
 
-    @Test
-    public void testPigProcessJsonDocument() throws Exception {
+  @Test
+  public void testPigProcessJsonDocument() throws Exception {
 
-        String[] input = {
-                "159475541894897679\ttwitter,statuses/user_timeline\t1384499359006\t{content:\"content\",[\"a\":1,\"b\":\"c\"}"
-        };
+    String[] input = {
+        "159475541894897679\ttwitter,statuses/user_timeline\t1384499359006\t{content:\"content\",[\"a\":1,\"b\":\"c\"}"
+    };
 
-        DoNothingProcessor processor = new DoNothingProcessor();
+    DoNothingProcessor processor = new DoNothingProcessor();
 
-        String doc = (String) StringUtils.split(input[0], '\t').get(3);
-        StreamsDatum inputDatum = new StreamsDatum(doc);
-        inputDatum.setId((String) StringUtils.split(input[0], '\t').get(0));
+    String doc = (String) StringUtils.split(input[0], '\t').get(3);
+    StreamsDatum inputDatum = new StreamsDatum(doc);
+    inputDatum.setId((String) StringUtils.split(input[0], '\t').get(0));
 
-        processor.prepare(null);
+    processor.prepare(null);
 
-        StreamsDatum resultDatum = processor.process(inputDatum).get(0);
-        String resultDocument = (String) resultDatum.getDocument();
+    StreamsDatum resultDatum = processor.process(inputDatum).get(0);
+    String resultDocument = (String) resultDatum.getDocument();
 
-        String[] output = new String[1];
-        output[0] = "(159475541894897679,twitter,statuses/user_timeline,1384499359006,"+resultDocument+")";
+    String[] output = new String[1];
+    output[0] = "(159475541894897679,twitter,statuses/user_timeline,1384499359006,"+resultDocument+")";
 
-        PigTest test;
-        test = new PigTest("src/test/resources/pigprocessdocumenttest.pig");
-        test.assertOutput("in", input, "out", output);
+    PigTest test;
+    test = new PigTest("src/test/resources/pigprocessdocumenttest.pig");
+    test.assertOutput("in", input, "out", output);
 
-    }
+  }
 
-    @Test
-    public void testPigProcessAppendDocument() throws Exception {
+  @Test
+  public void testPigProcessAppendDocument() throws Exception {
 
-        String[] input = {
-                "159475541894897679\ttwitter,statuses/user_timeline\t1384499359006\thowdy"
-        };
+    String[] input = {
+        "159475541894897679\ttwitter,statuses/user_timeline\t1384499359006\thowdy"
+    };
 
-        AppendStringProcessor processor = new AppendStringProcessor();
+    AppendStringProcessor processor = new AppendStringProcessor();
 
-        String doc = (String) StringUtils.split(input[0], '\t').get(3);
-        StreamsDatum inputDatum = new StreamsDatum(doc);
-        inputDatum.setId((String) StringUtils.split(input[0], '\t').get(0));
+    String doc = (String) StringUtils.split(input[0], '\t').get(3);
+    StreamsDatum inputDatum = new StreamsDatum(doc);
+    inputDatum.setId((String) StringUtils.split(input[0], '\t').get(0));
 
-        processor.prepare(new String[]{"doody"});
+    processor.prepare(new String[]{"doody"});
 
-        StreamsDatum resultDatum = processor.process(inputDatum).get(0);
-        String resultDocument = (String) resultDatum.getDocument();
+    StreamsDatum resultDatum = processor.process(inputDatum).get(0);
+    String resultDocument = (String) resultDatum.getDocument();
 
-        String[] output = new String[1];
-        output[0] = "(159475541894897679,twitter,statuses/user_timeline,1384499359006,"+resultDocument+")";
+    String[] output = new String[1];
+    output[0] = "(159475541894897679,twitter,statuses/user_timeline,1384499359006,"+resultDocument+")";
 
-        PigTest test;
-        test = new PigTest("src/test/resources/pigprocessdocumentappendtest.pig");
-        test.assertOutput("in", input, "out", output);
+    PigTest test;
+    test = new PigTest("src/test/resources/pigprocessdocumentappendtest.pig");
+    test.assertOutput("in", input, "out", output);
 
-    }
+  }
 }