You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samoa.apache.org by mm...@apache.org on 2015/02/04 16:00:22 UTC

[06/43] incubator-samoa git commit: Format java source files according to the eclipse-format.xml standard - spaces, no tabs - indent size = 2 spaces - line wrap at 120

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-instances/src/main/java/com/yahoo/labs/samoa/instances/SparseInstanceData.java
----------------------------------------------------------------------
diff --git a/samoa-instances/src/main/java/com/yahoo/labs/samoa/instances/SparseInstanceData.java b/samoa-instances/src/main/java/com/yahoo/labs/samoa/instances/SparseInstanceData.java
index e917844..1db95d0 100644
--- a/samoa-instances/src/main/java/com/yahoo/labs/samoa/instances/SparseInstanceData.java
+++ b/samoa-instances/src/main/java/com/yahoo/labs/samoa/instances/SparseInstanceData.java
@@ -25,118 +25,118 @@ package com.yahoo.labs.samoa.instances;
  */
 
 /**
- *
+ * 
  * @author abifet
  */
-public class SparseInstanceData implements InstanceData{
-    
-    public SparseInstanceData(double[] attributeValues, int[] indexValues, int numberAttributes) {
-       this.attributeValues = attributeValues;
-       this.indexValues = indexValues;
-       this.numberAttributes = numberAttributes;
-    }
-    
-    public SparseInstanceData(int length) {
-       this.attributeValues = new double[length];
-       this.indexValues =  new int[length];
-    }
-    
-    
-    protected double[] attributeValues;
+public class SparseInstanceData implements InstanceData {
 
-    public double[] getAttributeValues() {
-        return attributeValues;
-    }
+  public SparseInstanceData(double[] attributeValues, int[] indexValues, int numberAttributes) {
+    this.attributeValues = attributeValues;
+    this.indexValues = indexValues;
+    this.numberAttributes = numberAttributes;
+  }
 
-    public void setAttributeValues(double[] attributeValues) {
-        this.attributeValues = attributeValues;
-    }
+  public SparseInstanceData(int length) {
+    this.attributeValues = new double[length];
+    this.indexValues = new int[length];
+  }
 
-    public int[] getIndexValues() {
-        return indexValues;
-    }
+  protected double[] attributeValues;
 
-    public void setIndexValues(int[] indexValues) {
-        this.indexValues = indexValues;
-    }
+  public double[] getAttributeValues() {
+    return attributeValues;
+  }
 
-    public int getNumberAttributes() {
-        return numberAttributes;
-    }
+  public void setAttributeValues(double[] attributeValues) {
+    this.attributeValues = attributeValues;
+  }
 
-    public void setNumberAttributes(int numberAttributes) {
-        this.numberAttributes = numberAttributes;
-    }
-    protected int[] indexValues;
-    protected int numberAttributes;
+  public int[] getIndexValues() {
+    return indexValues;
+  }
 
-    @Override
-    public int numAttributes() {
-        return this.numberAttributes;
-    }
+  public void setIndexValues(int[] indexValues) {
+    this.indexValues = indexValues;
+  }
+
+  public int getNumberAttributes() {
+    return numberAttributes;
+  }
+
+  public void setNumberAttributes(int numberAttributes) {
+    this.numberAttributes = numberAttributes;
+  }
+
+  protected int[] indexValues;
+  protected int numberAttributes;
 
-    @Override
-    public double value(int indexAttribute) {
-        int location = locateIndex(indexAttribute);
-        //return location == -1 ? 0 : this.attributeValues[location];
-      //      int index = locateIndex(attIndex);
+  @Override
+  public int numAttributes() {
+    return this.numberAttributes;
+  }
+
+  @Override
+  public double value(int indexAttribute) {
+    int location = locateIndex(indexAttribute);
+    // return location == -1 ? 0 : this.attributeValues[location];
+    // int index = locateIndex(attIndex);
     if ((location >= 0) && (indexValues[location] == indexAttribute)) {
       return attributeValues[location];
     } else {
       return 0.0;
     }
-    }
+  }
 
-    @Override
-    public boolean isMissing(int indexAttribute) {
-        return Double.isNaN(this.value(indexAttribute));
-    }
+  @Override
+  public boolean isMissing(int indexAttribute) {
+    return Double.isNaN(this.value(indexAttribute));
+  }
 
-    @Override
-    public int numValues() {
-        return this.attributeValues.length;
-    }
+  @Override
+  public int numValues() {
+    return this.attributeValues.length;
+  }
 
-    @Override
-    public int index(int indexAttribute) {
-        return this.indexValues[indexAttribute];
-    }
+  @Override
+  public int index(int indexAttribute) {
+    return this.indexValues[indexAttribute];
+  }
 
-    @Override
-    public double valueSparse(int indexAttribute) {
-        return this.attributeValues[indexAttribute];
-    }
+  @Override
+  public double valueSparse(int indexAttribute) {
+    return this.attributeValues[indexAttribute];
+  }
 
-    @Override
-    public boolean isMissingSparse(int indexAttribute) {
-        return Double.isNaN(this.valueSparse(indexAttribute));
-    }
+  @Override
+  public boolean isMissingSparse(int indexAttribute) {
+    return Double.isNaN(this.valueSparse(indexAttribute));
+  }
+
+  /*
+   * @Override public double value(Attribute attribute) { return
+   * value(attribute.index()); }
+   */
 
-    /*@Override
-    public double value(Attribute attribute) {
-        return value(attribute.index());
-    }*/
-
-    @Override
-    public double[] toDoubleArray() {
-        double[] array = new double[numAttributes()];
-        for (int i=0; i<numValues() ; i++) {
-            array[index(i)] = valueSparse(i);
-        }
-        return array;
+  @Override
+  public double[] toDoubleArray() {
+    double[] array = new double[numAttributes()];
+    for (int i = 0; i < numValues(); i++) {
+      array[index(i)] = valueSparse(i);
     }
+    return array;
+  }
 
-    @Override
-    public void setValue(int attributeIndex, double d) {
-        int index = locateIndex(attributeIndex);
-        if (index(index) == attributeIndex) {
-            this.attributeValues[index] = d;
-        } else {
-            // We need to add the value
-        }
+  @Override
+  public void setValue(int attributeIndex, double d) {
+    int index = locateIndex(attributeIndex);
+    if (index(index) == attributeIndex) {
+      this.attributeValues[index] = d;
+    } else {
+      // We need to add the value
     }
-    
-    /**
+  }
+
+  /**
    * Locates the greatest index that is not greater than the given index.
    * 
    * @return the internal index of the attribute index. Returns -1 if no index
@@ -168,5 +168,5 @@ public class SparseInstanceData implements InstanceData{
       return min - 1;
     }
   }
-    
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-instances/src/main/java/com/yahoo/labs/samoa/instances/Utils.java
----------------------------------------------------------------------
diff --git a/samoa-instances/src/main/java/com/yahoo/labs/samoa/instances/Utils.java b/samoa-instances/src/main/java/com/yahoo/labs/samoa/instances/Utils.java
index f3dc1b9..dd9df6d 100644
--- a/samoa-instances/src/main/java/com/yahoo/labs/samoa/instances/Utils.java
+++ b/samoa-instances/src/main/java/com/yahoo/labs/samoa/instances/Utils.java
@@ -21,68 +21,71 @@ package com.yahoo.labs.samoa.instances;
  */
 
 public class Utils {
-    public static int maxIndex(double[] doubles) {
+  public static int maxIndex(double[] doubles) {
 
-        double maximum = 0;
-        int maxIndex = 0;
+    double maximum = 0;
+    int maxIndex = 0;
 
-        for (int i = 0; i < doubles.length; i++) {
-            if ((i == 0) || (doubles[i] > maximum)) {
-                maxIndex = i;
-                maximum = doubles[i];
-            }
-        }
-
-        return maxIndex;
+    for (int i = 0; i < doubles.length; i++) {
+      if ((i == 0) || (doubles[i] > maximum)) {
+        maxIndex = i;
+        maximum = doubles[i];
+      }
     }
 
-    public static String quote(String string) {
-        boolean quote = false;
+    return maxIndex;
+  }
 
-        // backquote the following characters
-        if ((string.indexOf('\n') != -1) || (string.indexOf('\r') != -1) || (string.indexOf('\'') != -1) || (string.indexOf('"') != -1)
-                || (string.indexOf('\\') != -1) || (string.indexOf('\t') != -1) || (string.indexOf('%') != -1) || (string.indexOf('\u001E') != -1)) {
-            string = backQuoteChars(string);
-            quote = true;
-        }
+  public static String quote(String string) {
+    boolean quote = false;
 
-        // Enclose the string in 's if the string contains a recently added
-        // backquote or contains one of the following characters.
-        if ((quote == true) || (string.indexOf('{') != -1) || (string.indexOf('}') != -1) || (string.indexOf(',') != -1) || (string.equals("?"))
-                || (string.indexOf(' ') != -1) || (string.equals(""))) {
-            string = ("'".concat(string)).concat("'");
-        }
+    // backquote the following characters
+    if ((string.indexOf('\n') != -1) || (string.indexOf('\r') != -1) || (string.indexOf('\'') != -1)
+        || (string.indexOf('"') != -1)
+        || (string.indexOf('\\') != -1) || (string.indexOf('\t') != -1) || (string.indexOf('%') != -1)
+        || (string.indexOf('\u001E') != -1)) {
+      string = backQuoteChars(string);
+      quote = true;
+    }
 
-        return string;
+    // Enclose the string in 's if the string contains a recently added
+    // backquote or contains one of the following characters.
+    if ((quote == true) || (string.indexOf('{') != -1) || (string.indexOf('}') != -1) || (string.indexOf(',') != -1)
+        || (string.equals("?"))
+        || (string.indexOf(' ') != -1) || (string.equals(""))) {
+      string = ("'".concat(string)).concat("'");
     }
 
-    public static String backQuoteChars(String string) {
+    return string;
+  }
 
-        int index;
-        StringBuffer newStringBuffer;
+  public static String backQuoteChars(String string) {
 
-        // replace each of the following characters with the backquoted version
-        char charsFind[] = { '\\', '\'', '\t', '\n', '\r', '"', '%', '\u001E' };
-        String charsReplace[] = { "\\\\", "\\'", "\\t", "\\n", "\\r", "\\\"", "\\%", "\\u001E" };
-        for (int i = 0; i < charsFind.length; i++) {
-            if (string.indexOf(charsFind[i]) != -1) {
-                newStringBuffer = new StringBuffer();
-                while ((index = string.indexOf(charsFind[i])) != -1) {
-                    if (index > 0) {
-                        newStringBuffer.append(string.substring(0, index));
-                    }
-                    newStringBuffer.append(charsReplace[i]);
-                    if ((index + 1) < string.length()) {
-                        string = string.substring(index + 1);
-                    } else {
-                        string = "";
-                    }
-                }
-                newStringBuffer.append(string);
-                string = newStringBuffer.toString();
-            }
-        }
+    int index;
+    StringBuffer newStringBuffer;
 
-        return string;
+    // replace each of the following characters with the backquoted version
+    char charsFind[] = { '\\', '\'', '\t', '\n', '\r', '"', '%', '\u001E' };
+    String charsReplace[] = { "\\\\", "\\'", "\\t", "\\n", "\\r", "\\\"", "\\%", "\\u001E" };
+    for (int i = 0; i < charsFind.length; i++) {
+      if (string.indexOf(charsFind[i]) != -1) {
+        newStringBuffer = new StringBuffer();
+        while ((index = string.indexOf(charsFind[i])) != -1) {
+          if (index > 0) {
+            newStringBuffer.append(string.substring(0, index));
+          }
+          newStringBuffer.append(charsReplace[i]);
+          if ((index + 1) < string.length()) {
+            string = string.substring(index + 1);
+          } else {
+            string = "";
+          }
+        }
+        newStringBuffer.append(string);
+        string = newStringBuffer.toString();
+      }
     }
+
+    return string;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-local/src/main/java/com/yahoo/labs/samoa/LocalDoTask.java
----------------------------------------------------------------------
diff --git a/samoa-local/src/main/java/com/yahoo/labs/samoa/LocalDoTask.java b/samoa-local/src/main/java/com/yahoo/labs/samoa/LocalDoTask.java
index 05ee1e1..27ca7a1 100644
--- a/samoa-local/src/main/java/com/yahoo/labs/samoa/LocalDoTask.java
+++ b/samoa-local/src/main/java/com/yahoo/labs/samoa/LocalDoTask.java
@@ -36,54 +36,55 @@ import com.yahoo.labs.samoa.topology.impl.SimpleEngine;
  */
 public class LocalDoTask {
 
-    // TODO: clean up this class for helping ML Developer in SAMOA
-    // TODO: clean up code from storm-impl
-	
-	// It seems that the 3 extra options are not used.
-	// Probably should remove them
-    private static final String SUPPRESS_STATUS_OUT_MSG = "Suppress the task status output. Normally it is sent to stderr.";
-    private static final String SUPPRESS_RESULT_OUT_MSG = "Suppress the task result output. Normally it is sent to stdout.";
-    private static final String STATUS_UPDATE_FREQ_MSG = "Wait time in milliseconds between status updates.";
-    private static final Logger logger = LoggerFactory.getLogger(LocalDoTask.class);
+  // TODO: clean up this class for helping ML Developer in SAMOA
+  // TODO: clean up code from storm-impl
 
-    /**
-     * The main method.
-     * 
-     * @param args
-     *            the arguments
-     */
-    public static void main(String[] args) {
+  // It seems that the 3 extra options are not used.
+  // Probably should remove them
+  private static final String SUPPRESS_STATUS_OUT_MSG = "Suppress the task status output. Normally it is sent to stderr.";
+  private static final String SUPPRESS_RESULT_OUT_MSG = "Suppress the task result output. Normally it is sent to stdout.";
+  private static final String STATUS_UPDATE_FREQ_MSG = "Wait time in milliseconds between status updates.";
+  private static final Logger logger = LoggerFactory.getLogger(LocalDoTask.class);
 
-        // ArrayList<String> tmpArgs = new ArrayList<String>(Arrays.asList(args));
+  /**
+   * The main method.
+   * 
+   * @param args
+   *          the arguments
+   */
+  public static void main(String[] args) {
 
-        // args = tmpArgs.toArray(new String[0]);
+    // ArrayList<String> tmpArgs = new ArrayList<String>(Arrays.asList(args));
 
-        FlagOption suppressStatusOutOpt = new FlagOption("suppressStatusOut", 'S', SUPPRESS_STATUS_OUT_MSG);
+    // args = tmpArgs.toArray(new String[0]);
 
-        FlagOption suppressResultOutOpt = new FlagOption("suppressResultOut", 'R', SUPPRESS_RESULT_OUT_MSG);
+    FlagOption suppressStatusOutOpt = new FlagOption("suppressStatusOut", 'S', SUPPRESS_STATUS_OUT_MSG);
 
-        IntOption statusUpdateFreqOpt = new IntOption("statusUpdateFrequency", 'F', STATUS_UPDATE_FREQ_MSG, 1000, 0, Integer.MAX_VALUE);
+    FlagOption suppressResultOutOpt = new FlagOption("suppressResultOut", 'R', SUPPRESS_RESULT_OUT_MSG);
 
-        Option[] extraOptions = new Option[] { suppressStatusOutOpt, suppressResultOutOpt, statusUpdateFreqOpt };
+    IntOption statusUpdateFreqOpt = new IntOption("statusUpdateFrequency", 'F', STATUS_UPDATE_FREQ_MSG, 1000, 0,
+        Integer.MAX_VALUE);
 
-        StringBuilder cliString = new StringBuilder();
-        for (String arg : args) {
-            cliString.append(" ").append(arg);
-        }
-        logger.debug("Command line string = {}", cliString.toString());
-        System.out.println("Command line string = " + cliString.toString());
+    Option[] extraOptions = new Option[] { suppressStatusOutOpt, suppressResultOutOpt, statusUpdateFreqOpt };
 
-        Task task;
-        try {
-            task = ClassOption.cliStringToObject(cliString.toString(), Task.class, extraOptions);
-            logger.info("Successfully instantiating {}", task.getClass().getCanonicalName());
-        } catch (Exception e) {
-            logger.error("Fail to initialize the task", e);
-            System.out.println("Fail to initialize the task" + e);
-            return;
-        }
-        task.setFactory(new SimpleComponentFactory());
-        task.init();
-        SimpleEngine.submitTopology(task.getTopology());
+    StringBuilder cliString = new StringBuilder();
+    for (String arg : args) {
+      cliString.append(" ").append(arg);
     }
+    logger.debug("Command line string = {}", cliString.toString());
+    System.out.println("Command line string = " + cliString.toString());
+
+    Task task;
+    try {
+      task = ClassOption.cliStringToObject(cliString.toString(), Task.class, extraOptions);
+      logger.info("Successfully instantiating {}", task.getClass().getCanonicalName());
+    } catch (Exception e) {
+      logger.error("Fail to initialize the task", e);
+      System.out.println("Fail to initialize the task" + e);
+      return;
+    }
+    task.setFactory(new SimpleComponentFactory());
+    task.init();
+    SimpleEngine.submitTopology(task.getTopology());
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleComponentFactory.java
----------------------------------------------------------------------
diff --git a/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleComponentFactory.java b/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleComponentFactory.java
index b289dbe..0c2f301 100644
--- a/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleComponentFactory.java
+++ b/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleComponentFactory.java
@@ -31,23 +31,23 @@ import com.yahoo.labs.samoa.topology.Topology;
 
 public class SimpleComponentFactory implements ComponentFactory {
 
-    public ProcessingItem createPi(Processor processor, int paralellism) {
-        return new SimpleProcessingItem(processor, paralellism);
-    }
+  public ProcessingItem createPi(Processor processor, int paralellism) {
+    return new SimpleProcessingItem(processor, paralellism);
+  }
 
-    public ProcessingItem createPi(Processor processor) {
-        return this.createPi(processor, 1);
-    }
+  public ProcessingItem createPi(Processor processor) {
+    return this.createPi(processor, 1);
+  }
 
-    public EntranceProcessingItem createEntrancePi(EntranceProcessor processor) {
-        return new SimpleEntranceProcessingItem(processor);
-    }
+  public EntranceProcessingItem createEntrancePi(EntranceProcessor processor) {
+    return new SimpleEntranceProcessingItem(processor);
+  }
 
-    public Stream createStream(IProcessingItem sourcePi) {
-        return new SimpleStream(sourcePi);
-    }
+  public Stream createStream(IProcessingItem sourcePi) {
+    return new SimpleStream(sourcePi);
+  }
 
-    public Topology createTopology(String topoName) {
-        return new SimpleTopology(topoName);
-    }
+  public Topology createTopology(String topoName) {
+    return new SimpleTopology(topoName);
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleEngine.java
----------------------------------------------------------------------
diff --git a/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleEngine.java b/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleEngine.java
index 9d131e1..5ca5837 100644
--- a/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleEngine.java
+++ b/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleEngine.java
@@ -28,10 +28,10 @@ import com.yahoo.labs.samoa.topology.Topology;
 
 public class SimpleEngine {
 
-    public static void submitTopology(Topology topology) {
-        SimpleTopology simpleTopology = (SimpleTopology) topology;
-        simpleTopology.run();
-        // runs until completion
-    }
+  public static void submitTopology(Topology topology) {
+    SimpleTopology simpleTopology = (SimpleTopology) topology;
+    simpleTopology.run();
+    // runs until completion
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleEntranceProcessingItem.java
----------------------------------------------------------------------
diff --git a/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleEntranceProcessingItem.java b/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleEntranceProcessingItem.java
index 4652ebb..c9cc601 100644
--- a/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleEntranceProcessingItem.java
+++ b/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleEntranceProcessingItem.java
@@ -24,10 +24,10 @@ import com.yahoo.labs.samoa.core.EntranceProcessor;
 import com.yahoo.labs.samoa.topology.LocalEntranceProcessingItem;
 
 class SimpleEntranceProcessingItem extends LocalEntranceProcessingItem {
-    public SimpleEntranceProcessingItem(EntranceProcessor processor) {
-        super(processor);
-    }
-    
-    // The default waiting time when there is no available events is 100ms
-    // Override waitForNewEvents() to change it
+  public SimpleEntranceProcessingItem(EntranceProcessor processor) {
+    super(processor);
+  }
+
+  // The default waiting time when there is no available events is 100ms
+  // Override waitForNewEvents() to change it
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleProcessingItem.java
----------------------------------------------------------------------
diff --git a/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleProcessingItem.java b/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleProcessingItem.java
index e3cc765..77361b1 100644
--- a/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleProcessingItem.java
+++ b/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleProcessingItem.java
@@ -34,54 +34,54 @@ import com.yahoo.labs.samoa.utils.PartitioningScheme;
 import com.yahoo.labs.samoa.utils.StreamDestination;
 
 /**
- *
+ * 
  * @author abifet
  */
 class SimpleProcessingItem extends AbstractProcessingItem {
-    private IProcessingItem[] arrayProcessingItem;
+  private IProcessingItem[] arrayProcessingItem;
 
-    SimpleProcessingItem(Processor processor) {
-        super(processor);
-    }
-    
-    SimpleProcessingItem(Processor processor, int parallelism) {
-    	super(processor);
-        this.setParallelism(parallelism);
-    }
-    
-    public IProcessingItem getProcessingItem(int i) {
-        return arrayProcessingItem[i];
-    }
-    
-    @Override
-    protected ProcessingItem addInputStream(Stream inputStream, PartitioningScheme scheme) {
-		StreamDestination destination = new StreamDestination(this, this.getParallelism(), scheme);
-		((SimpleStream)inputStream).addDestination(destination);
-		return this;
-	}
+  SimpleProcessingItem(Processor processor) {
+    super(processor);
+  }
 
-    public SimpleProcessingItem copy() {
-    	Processor processor = this.getProcessor();
-        return new SimpleProcessingItem(processor.newProcessor(processor));
-    }
+  SimpleProcessingItem(Processor processor, int parallelism) {
+    super(processor);
+    this.setParallelism(parallelism);
+  }
 
-    public void processEvent(ContentEvent event, int counter) {
-    	
-        int parallelism = this.getParallelism();
-        //System.out.println("Process event "+event+" (isLast="+event.isLastEvent()+") with counter="+counter+" while parallelism="+parallelism);
-        if (this.arrayProcessingItem == null && parallelism > 0) {
-            //Init processing elements, the first time they are needed
-            this.arrayProcessingItem = new IProcessingItem[parallelism];
-            for (int j = 0; j < parallelism; j++) {
-                arrayProcessingItem[j] = this.copy();
-                arrayProcessingItem[j].getProcessor().onCreate(j);
-            }
-        }
-        if (this.arrayProcessingItem != null) {
-        	IProcessingItem pi = this.getProcessingItem(counter);
-        	Processor p = pi.getProcessor();
-        	//System.out.println("PI="+pi+", p="+p);
-            this.getProcessingItem(counter).getProcessor().process(event);
-        }
+  public IProcessingItem getProcessingItem(int i) {
+    return arrayProcessingItem[i];
+  }
+
+  @Override
+  protected ProcessingItem addInputStream(Stream inputStream, PartitioningScheme scheme) {
+    StreamDestination destination = new StreamDestination(this, this.getParallelism(), scheme);
+    ((SimpleStream) inputStream).addDestination(destination);
+    return this;
+  }
+
+  public SimpleProcessingItem copy() {
+    Processor processor = this.getProcessor();
+    return new SimpleProcessingItem(processor.newProcessor(processor));
+  }
+
+  public void processEvent(ContentEvent event, int counter) {
+
+    int parallelism = this.getParallelism();
+    // System.out.println("Process event "+event+" (isLast="+event.isLastEvent()+") with counter="+counter+" while parallelism="+parallelism);
+    if (this.arrayProcessingItem == null && parallelism > 0) {
+      // Init processing elements, the first time they are needed
+      this.arrayProcessingItem = new IProcessingItem[parallelism];
+      for (int j = 0; j < parallelism; j++) {
+        arrayProcessingItem[j] = this.copy();
+        arrayProcessingItem[j].getProcessor().onCreate(j);
+      }
+    }
+    if (this.arrayProcessingItem != null) {
+      IProcessingItem pi = this.getProcessingItem(counter);
+      Processor p = pi.getProcessor();
+      // System.out.println("PI="+pi+", p="+p);
+      this.getProcessingItem(counter).getProcessor().process(event);
     }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleStream.java
----------------------------------------------------------------------
diff --git a/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleStream.java b/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleStream.java
index 74684a7..09dc555 100644
--- a/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleStream.java
+++ b/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleStream.java
@@ -38,56 +38,58 @@ import com.yahoo.labs.samoa.utils.StreamDestination;
  * @author abifet
  */
 class SimpleStream extends AbstractStream {
-    private List<StreamDestination> destinations;
-    private int maxCounter;
-    private int eventCounter;
+  private List<StreamDestination> destinations;
+  private int maxCounter;
+  private int eventCounter;
 
-    SimpleStream(IProcessingItem sourcePi) {
-    	super(sourcePi);
-    	this.destinations = new LinkedList<>();
-    	this.eventCounter = 0;
-    	this.maxCounter = 1;
-    }
+  SimpleStream(IProcessingItem sourcePi) {
+    super(sourcePi);
+    this.destinations = new LinkedList<>();
+    this.eventCounter = 0;
+    this.maxCounter = 1;
+  }
 
-    private int getNextCounter() {
-    	if (maxCounter > 0 && eventCounter >= maxCounter) eventCounter = 0;
-    	this.eventCounter++;
-    	return this.eventCounter;
-    }
+  private int getNextCounter() {
+    if (maxCounter > 0 && eventCounter >= maxCounter)
+      eventCounter = 0;
+    this.eventCounter++;
+    return this.eventCounter;
+  }
 
-    @Override
-    public void put(ContentEvent event) {
-    	this.put(event, this.getNextCounter());
-    }
-    
-    private void put(ContentEvent event, int counter) {
-    	SimpleProcessingItem pi;
-        int parallelism;
-        for (StreamDestination destination:destinations) {
-            pi = (SimpleProcessingItem) destination.getProcessingItem();
-            parallelism = destination.getParallelism();
-            switch (destination.getPartitioningScheme()) {
-            case SHUFFLE:
-                pi.processEvent(event, counter % parallelism);
-                break;
-            case GROUP_BY_KEY:
-                HashCodeBuilder hb = new HashCodeBuilder();
-                hb.append(event.getKey());
-                int key = hb.build() % parallelism;
-                pi.processEvent(event, key);
-                break;
-            case BROADCAST:
-                for (int p = 0; p < parallelism; p++) {
-                    pi.processEvent(event, p);
-                }
-                break;
-            }
+  @Override
+  public void put(ContentEvent event) {
+    this.put(event, this.getNextCounter());
+  }
+
+  private void put(ContentEvent event, int counter) {
+    SimpleProcessingItem pi;
+    int parallelism;
+    for (StreamDestination destination : destinations) {
+      pi = (SimpleProcessingItem) destination.getProcessingItem();
+      parallelism = destination.getParallelism();
+      switch (destination.getPartitioningScheme()) {
+      case SHUFFLE:
+        pi.processEvent(event, counter % parallelism);
+        break;
+      case GROUP_BY_KEY:
+        HashCodeBuilder hb = new HashCodeBuilder();
+        hb.append(event.getKey());
+        int key = hb.build() % parallelism;
+        pi.processEvent(event, key);
+        break;
+      case BROADCAST:
+        for (int p = 0; p < parallelism; p++) {
+          pi.processEvent(event, p);
         }
+        break;
+      }
     }
+  }
 
-    public void addDestination(StreamDestination destination) {
-        this.destinations.add(destination);
-        if (maxCounter <= 0) maxCounter = 1;
-        maxCounter *= destination.getParallelism();
-    }
+  public void addDestination(StreamDestination destination) {
+    this.destinations.add(destination);
+    if (maxCounter <= 0)
+      maxCounter = 1;
+    maxCounter *= destination.getParallelism();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleTopology.java
----------------------------------------------------------------------
diff --git a/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleTopology.java b/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleTopology.java
index 675b4ac..e7fddbd 100644
--- a/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleTopology.java
+++ b/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleTopology.java
@@ -27,18 +27,21 @@ package com.yahoo.labs.samoa.topology.impl;
 import com.yahoo.labs.samoa.topology.AbstractTopology;
 
 public class SimpleTopology extends AbstractTopology {
-	SimpleTopology(String name) {
-		super(name);
-	}
+  SimpleTopology(String name) {
+    super(name);
+  }
 
-	public void run() {
-    	if (this.getEntranceProcessingItems() == null)
-    		throw new IllegalStateException("You need to set entrance PI before running the topology.");
-    	if (this.getEntranceProcessingItems().size() != 1)
-    		throw new IllegalStateException("SimpleTopology supports 1 entrance PI only. Number of entrance PIs is "+this.getEntranceProcessingItems().size());
-    	
-    	SimpleEntranceProcessingItem entrancePi = (SimpleEntranceProcessingItem) this.getEntranceProcessingItems().toArray()[0];
-    	entrancePi.getProcessor().onCreate(0); // id=0 as it is not used in simple mode
-        entrancePi.startSendingEvents();
-    }
+  public void run() {
+    if (this.getEntranceProcessingItems() == null)
+      throw new IllegalStateException("You need to set entrance PI before running the topology.");
+    if (this.getEntranceProcessingItems().size() != 1)
+      throw new IllegalStateException("SimpleTopology supports 1 entrance PI only. Number of entrance PIs is "
+          + this.getEntranceProcessingItems().size());
+
+    SimpleEntranceProcessingItem entrancePi = (SimpleEntranceProcessingItem) this.getEntranceProcessingItems()
+        .toArray()[0];
+    entrancePi.getProcessor().onCreate(0); // id=0 as it is not used in simple
+                                           // mode
+    entrancePi.startSendingEvents();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-local/src/test/java/com/yahoo/labs/samoa/AlgosTest.java
----------------------------------------------------------------------
diff --git a/samoa-local/src/test/java/com/yahoo/labs/samoa/AlgosTest.java b/samoa-local/src/test/java/com/yahoo/labs/samoa/AlgosTest.java
index 9bf1c2d..d3e54a8 100644
--- a/samoa-local/src/test/java/com/yahoo/labs/samoa/AlgosTest.java
+++ b/samoa-local/src/test/java/com/yahoo/labs/samoa/AlgosTest.java
@@ -24,64 +24,63 @@ import org.junit.Test;
 
 public class AlgosTest {
 
+  @Test
+  public void testVHTLocal() throws Exception {
 
-    @Test
-    public void testVHTLocal() throws Exception {
+    TestParams vhtConfig = new TestParams.Builder()
+        .inputInstances(200_000)
+        .samplingSize(20_000)
+        .evaluationInstances(200_000)
+        .classifiedInstances(200_000)
+        .classificationsCorrect(75f)
+        .kappaStat(0f)
+        .kappaTempStat(0f)
+        .cliStringTemplate(TestParams.Templates.PREQEVAL_VHT_RANDOMTREE)
+        .resultFilePollTimeout(10)
+        .prePollWait(10)
+        .taskClassName(LocalDoTask.class.getName())
+        .build();
+    TestUtils.test(vhtConfig);
 
-        TestParams vhtConfig = new TestParams.Builder()
-                .inputInstances(200_000)
-                .samplingSize(20_000)
-                .evaluationInstances(200_000)
-                .classifiedInstances(200_000)
-                .classificationsCorrect(75f)
-                .kappaStat(0f)
-                .kappaTempStat(0f)
-                .cliStringTemplate(TestParams.Templates.PREQEVAL_VHT_RANDOMTREE)
-                .resultFilePollTimeout(10)
-                .prePollWait(10)
-                .taskClassName(LocalDoTask.class.getName())
-                .build();
-        TestUtils.test(vhtConfig);
+  }
 
-    }
+  @Test
+  public void testBaggingLocal() throws Exception {
+    TestParams baggingConfig = new TestParams.Builder()
+        .inputInstances(200_000)
+        .samplingSize(20_000)
+        .evaluationInstances(180_000)
+        .classifiedInstances(210_000)
+        .classificationsCorrect(60f)
+        .kappaStat(0f)
+        .kappaTempStat(0f)
+        .cliStringTemplate(TestParams.Templates.PREQEVAL_BAGGING_RANDOMTREE)
+        .prePollWait(10)
+        .resultFilePollTimeout(10)
+        .taskClassName(LocalDoTask.class.getName())
+        .build();
+    TestUtils.test(baggingConfig);
 
-    @Test
-    public void testBaggingLocal() throws Exception {
-        TestParams baggingConfig = new TestParams.Builder()
-                .inputInstances(200_000)
-                .samplingSize(20_000)
-                .evaluationInstances(180_000)
-                .classifiedInstances(210_000)
-                .classificationsCorrect(60f)
-                .kappaStat(0f)
-                .kappaTempStat(0f)
-                .cliStringTemplate(TestParams.Templates.PREQEVAL_BAGGING_RANDOMTREE)
-                .prePollWait(10)
-                .resultFilePollTimeout(10)
-                .taskClassName(LocalDoTask.class.getName())
-                .build();
-        TestUtils.test(baggingConfig);
+  }
 
-    }
+  @Test
+  public void testNaiveBayesLocal() throws Exception {
 
-    @Test
-    public void testNaiveBayesLocal() throws Exception {
+    TestParams vhtConfig = new TestParams.Builder()
+        .inputInstances(200_000)
+        .samplingSize(20_000)
+        .evaluationInstances(200_000)
+        .classifiedInstances(200_000)
+        .classificationsCorrect(65f)
+        .kappaStat(0f)
+        .kappaTempStat(0f)
+        .cliStringTemplate(TestParams.Templates.PREQEVAL_NAIVEBAYES_HYPERPLANE)
+        .resultFilePollTimeout(10)
+        .prePollWait(10)
+        .taskClassName(LocalDoTask.class.getName())
+        .build();
+    TestUtils.test(vhtConfig);
 
-        TestParams vhtConfig = new TestParams.Builder()
-                .inputInstances(200_000)
-                .samplingSize(20_000)
-                .evaluationInstances(200_000)
-                .classifiedInstances(200_000)
-                .classificationsCorrect(65f)
-                .kappaStat(0f)
-                .kappaTempStat(0f)
-                .cliStringTemplate(TestParams.Templates.PREQEVAL_NAIVEBAYES_HYPERPLANE)
-                .resultFilePollTimeout(10)
-                .prePollWait(10)
-                .taskClassName(LocalDoTask.class.getName())
-                .build();
-        TestUtils.test(vhtConfig);
-
-    }
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleComponentFactoryTest.java
----------------------------------------------------------------------
diff --git a/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleComponentFactoryTest.java b/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleComponentFactoryTest.java
index 02a9295..bfd6fe1 100644
--- a/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleComponentFactoryTest.java
+++ b/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleComponentFactoryTest.java
@@ -36,61 +36,64 @@ import com.yahoo.labs.samoa.topology.Topology;
 
 /**
  * @author Anh Thu Vu
- *
+ * 
  */
 public class SimpleComponentFactoryTest {
 
-	@Tested private SimpleComponentFactory factory;
-	@Mocked private Processor processor, processorReplica;
-	@Mocked private EntranceProcessor entranceProcessor;
-	
-	private final int parallelism = 3;
-	private final String topoName = "TestTopology";
-	
+  @Tested
+  private SimpleComponentFactory factory;
+  @Mocked
+  private Processor processor, processorReplica;
+  @Mocked
+  private EntranceProcessor entranceProcessor;
+
+  private final int parallelism = 3;
+  private final String topoName = "TestTopology";
+
+  @Before
+  public void setUp() throws Exception {
+    factory = new SimpleComponentFactory();
+  }
+
+  @Test
+  public void testCreatePiNoParallelism() {
+    ProcessingItem pi = factory.createPi(processor);
+    assertNotNull("ProcessingItem created is null.", pi);
+    assertEquals("ProcessingItem created is not a SimpleProcessingItem.", SimpleProcessingItem.class, pi.getClass());
+    assertEquals("Parallelism of PI is not 1", 1, pi.getParallelism(), 0);
+  }
+
+  @Test
+  public void testCreatePiWithParallelism() {
+    ProcessingItem pi = factory.createPi(processor, parallelism);
+    assertNotNull("ProcessingItem created is null.", pi);
+    assertEquals("ProcessingItem created is not a SimpleProcessingItem.", SimpleProcessingItem.class, pi.getClass());
+    assertEquals("Parallelism of PI is not ", parallelism, pi.getParallelism(), 0);
+  }
+
+  @Test
+  public void testCreateStream() {
+    ProcessingItem pi = factory.createPi(processor);
+
+    Stream stream = factory.createStream(pi);
+    assertNotNull("Stream created is null", stream);
+    assertEquals("Stream created is not a SimpleStream.", SimpleStream.class, stream.getClass());
+  }
 
-	@Before
-	public void setUp() throws Exception {
-		factory = new SimpleComponentFactory();
-	}
+  @Test
+  public void testCreateTopology() {
+    Topology topology = factory.createTopology(topoName);
+    assertNotNull("Topology created is null.", topology);
+    assertEquals("Topology created is not a SimpleTopology.", SimpleTopology.class, topology.getClass());
+  }
 
-	@Test
-	public void testCreatePiNoParallelism() {
-		ProcessingItem pi = factory.createPi(processor);
-		assertNotNull("ProcessingItem created is null.",pi);
-		assertEquals("ProcessingItem created is not a SimpleProcessingItem.",SimpleProcessingItem.class,pi.getClass());
-		assertEquals("Parallelism of PI is not 1",1,pi.getParallelism(),0);
-	}
-	
-	@Test
-	public void testCreatePiWithParallelism() {
-		ProcessingItem pi = factory.createPi(processor,parallelism);
-		assertNotNull("ProcessingItem created is null.",pi);
-		assertEquals("ProcessingItem created is not a SimpleProcessingItem.",SimpleProcessingItem.class,pi.getClass());
-		assertEquals("Parallelism of PI is not ",parallelism,pi.getParallelism(),0);
-	}
-	
-	@Test
-	public void testCreateStream() {
-		ProcessingItem pi = factory.createPi(processor);
-		
-		Stream stream = factory.createStream(pi);
-		assertNotNull("Stream created is null",stream);
-		assertEquals("Stream created is not a SimpleStream.",SimpleStream.class,stream.getClass());
-	}
-	
-	@Test
-	public void testCreateTopology() {
-		Topology topology = factory.createTopology(topoName);
-		assertNotNull("Topology created is null.",topology);
-		assertEquals("Topology created is not a SimpleTopology.",SimpleTopology.class,topology.getClass());
-	}
-	
-	@Test
-	public void testCreateEntrancePi() {
-		EntranceProcessingItem entrancePi = factory.createEntrancePi(entranceProcessor);
-		assertNotNull("EntranceProcessingItem created is null.",entrancePi);
-		assertEquals("EntranceProcessingItem created is not a SimpleEntranceProcessingItem.",SimpleEntranceProcessingItem.class,entrancePi.getClass());
-		assertSame("EntranceProcessor is not set correctly.",entranceProcessor, entrancePi.getProcessor());
-	}
+  @Test
+  public void testCreateEntrancePi() {
+    EntranceProcessingItem entrancePi = factory.createEntrancePi(entranceProcessor);
+    assertNotNull("EntranceProcessingItem created is null.", entrancePi);
+    assertEquals("EntranceProcessingItem created is not a SimpleEntranceProcessingItem.",
+        SimpleEntranceProcessingItem.class, entrancePi.getClass());
+    assertSame("EntranceProcessor is not set correctly.", entranceProcessor, entrancePi.getProcessor());
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleEngineTest.java
----------------------------------------------------------------------
diff --git a/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleEngineTest.java b/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleEngineTest.java
index c4649ed..23b38b4 100644
--- a/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleEngineTest.java
+++ b/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleEngineTest.java
@@ -29,29 +29,32 @@ import org.junit.Test;
 
 /**
  * @author Anh Thu Vu
- *
+ * 
  */
 public class SimpleEngineTest {
 
-	@Tested private SimpleEngine unused;
-	@Mocked private SimpleTopology topology;
-	@Mocked private Runtime mockedRuntime;
-	
-	@Test
-	public void testSubmitTopology() {
-		new NonStrictExpectations() {
-			{
-				Runtime.getRuntime();
-				result=mockedRuntime;
-				mockedRuntime.exit(0);
-			}
-		};
-		SimpleEngine.submitTopology(topology);
-		new Verifications() {
-			{
-				topology.run();
-			}
-		};
-	}
+  @Tested
+  private SimpleEngine unused;
+  @Mocked
+  private SimpleTopology topology;
+  @Mocked
+  private Runtime mockedRuntime;
+
+  @Test
+  public void testSubmitTopology() {
+    new NonStrictExpectations() {
+      {
+        Runtime.getRuntime();
+        result = mockedRuntime;
+        mockedRuntime.exit(0);
+      }
+    };
+    SimpleEngine.submitTopology(topology);
+    new Verifications() {
+      {
+        topology.run();
+      }
+    };
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleEntranceProcessingItemTest.java
----------------------------------------------------------------------
diff --git a/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleEntranceProcessingItemTest.java b/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleEntranceProcessingItemTest.java
index 41ae22b..0c1e475 100644
--- a/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleEntranceProcessingItemTest.java
+++ b/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleEntranceProcessingItemTest.java
@@ -36,118 +36,137 @@ import com.yahoo.labs.samoa.topology.Stream;
 
 /**
  * @author Anh Thu Vu
- *
+ * 
  */
 public class SimpleEntranceProcessingItemTest {
 
-	@Tested private SimpleEntranceProcessingItem entrancePi;
-	
-	@Mocked private EntranceProcessor entranceProcessor;
-	@Mocked private Stream outputStream, anotherStream;
-	@Mocked private ContentEvent event;
-	
-	@Mocked private Thread unused;
-	
-	/**
-	 * @throws java.lang.Exception
-	 */
-	@Before
-	public void setUp() throws Exception {
-		entrancePi = new SimpleEntranceProcessingItem(entranceProcessor);
-	}
-
-	@Test
-	public void testContructor() {
-		assertSame("EntranceProcessor is not set correctly.",entranceProcessor,entrancePi.getProcessor());
-	}
-	
-	@Test
-	public void testSetOutputStream() {
-		entrancePi.setOutputStream(outputStream);
-		assertSame("OutputStream is not set correctly.",outputStream,entrancePi.getOutputStream());
-	}
-	
-	@Test
-	public void testSetOutputStreamRepeate() {
-		entrancePi.setOutputStream(outputStream);
-		entrancePi.setOutputStream(outputStream);
-		assertSame("OutputStream is not set correctly.",outputStream,entrancePi.getOutputStream());
-	}
-	
-	@Test(expected=IllegalStateException.class)
-	public void testSetOutputStreamError() {
-		entrancePi.setOutputStream(outputStream);
-		entrancePi.setOutputStream(anotherStream);
-	}
-	
-	@Test
-	public void testInjectNextEventSuccess() {
-		entrancePi.setOutputStream(outputStream);
-		new StrictExpectations() {
-			{
-				entranceProcessor.hasNext();
-				result=true;
-				
-				entranceProcessor.nextEvent();
-				result=event;
-			}
-		};
-		entrancePi.injectNextEvent();
-		new Verifications() {
-			{
-				outputStream.put(event);
-			}
-		};
-	}
-	
-	@Test
-	public void testStartSendingEvents() {
-		entrancePi.setOutputStream(outputStream);
-		new StrictExpectations() {
-			{
-				for (int i=0; i<1; i++) {
-					entranceProcessor.isFinished(); result=false;
-					entranceProcessor.hasNext(); result=false;
-				}
-				
-				for (int i=0; i<5; i++) {
-					entranceProcessor.isFinished(); result=false;
-					entranceProcessor.hasNext(); result=true;
-					entranceProcessor.nextEvent(); result=event;
-					outputStream.put(event);
-				}
-				
-				for (int i=0; i<2; i++) {
-					entranceProcessor.isFinished(); result=false;
-					entranceProcessor.hasNext(); result=false;
-				}
-				
-				for (int i=0; i<5; i++) {
-					entranceProcessor.isFinished(); result=false;
-					entranceProcessor.hasNext(); result=true;
-					entranceProcessor.nextEvent(); result=event;
-					outputStream.put(event);
-				}
-
-				entranceProcessor.isFinished(); result=true; times=1;
-				entranceProcessor.hasNext(); times=0;
-			}
-		};
-		entrancePi.startSendingEvents();
-		new Verifications() {
-			{
-				try {
-					Thread.sleep(anyInt); times=3;
-				} catch (InterruptedException e) {
-
-				}
-			}
-		};
-	}
-	
-	@Test(expected=IllegalStateException.class)
-	public void testStartSendingEventsError() {
-		entrancePi.startSendingEvents();
-	}
+  @Tested
+  private SimpleEntranceProcessingItem entrancePi;
+
+  @Mocked
+  private EntranceProcessor entranceProcessor;
+  @Mocked
+  private Stream outputStream, anotherStream;
+  @Mocked
+  private ContentEvent event;
+
+  @Mocked
+  private Thread unused;
+
+  /**
+   * @throws java.lang.Exception
+   */
+  @Before
+  public void setUp() throws Exception {
+    entrancePi = new SimpleEntranceProcessingItem(entranceProcessor);
+  }
+
+  @Test
+  public void testContructor() {
+    assertSame("EntranceProcessor is not set correctly.", entranceProcessor, entrancePi.getProcessor());
+  }
+
+  @Test
+  public void testSetOutputStream() {
+    entrancePi.setOutputStream(outputStream);
+    assertSame("OutputStream is not set correctly.", outputStream, entrancePi.getOutputStream());
+  }
+
+  @Test
+  public void testSetOutputStreamRepeate() {
+    entrancePi.setOutputStream(outputStream);
+    entrancePi.setOutputStream(outputStream);
+    assertSame("OutputStream is not set correctly.", outputStream, entrancePi.getOutputStream());
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testSetOutputStreamError() {
+    entrancePi.setOutputStream(outputStream);
+    entrancePi.setOutputStream(anotherStream);
+  }
+
+  @Test
+  public void testInjectNextEventSuccess() {
+    entrancePi.setOutputStream(outputStream);
+    new StrictExpectations() {
+      {
+        entranceProcessor.hasNext();
+        result = true;
+
+        entranceProcessor.nextEvent();
+        result = event;
+      }
+    };
+    entrancePi.injectNextEvent();
+    new Verifications() {
+      {
+        outputStream.put(event);
+      }
+    };
+  }
+
+  @Test
+  public void testStartSendingEvents() {
+    entrancePi.setOutputStream(outputStream);
+    new StrictExpectations() {
+      {
+        for (int i = 0; i < 1; i++) {
+          entranceProcessor.isFinished();
+          result = false;
+          entranceProcessor.hasNext();
+          result = false;
+        }
+
+        for (int i = 0; i < 5; i++) {
+          entranceProcessor.isFinished();
+          result = false;
+          entranceProcessor.hasNext();
+          result = true;
+          entranceProcessor.nextEvent();
+          result = event;
+          outputStream.put(event);
+        }
+
+        for (int i = 0; i < 2; i++) {
+          entranceProcessor.isFinished();
+          result = false;
+          entranceProcessor.hasNext();
+          result = false;
+        }
+
+        for (int i = 0; i < 5; i++) {
+          entranceProcessor.isFinished();
+          result = false;
+          entranceProcessor.hasNext();
+          result = true;
+          entranceProcessor.nextEvent();
+          result = event;
+          outputStream.put(event);
+        }
+
+        entranceProcessor.isFinished();
+        result = true;
+        times = 1;
+        entranceProcessor.hasNext();
+        times = 0;
+      }
+    };
+    entrancePi.startSendingEvents();
+    new Verifications() {
+      {
+        try {
+          Thread.sleep(anyInt);
+          times = 3;
+        } catch (InterruptedException e) {
+
+        }
+      }
+    };
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testStartSendingEventsError() {
+    entrancePi.startSendingEvents();
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleProcessingItemTest.java
----------------------------------------------------------------------
diff --git a/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleProcessingItemTest.java b/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleProcessingItemTest.java
index a4a288a..caa82bf 100644
--- a/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleProcessingItemTest.java
+++ b/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleProcessingItemTest.java
@@ -40,81 +40,85 @@ import com.yahoo.labs.samoa.utils.StreamDestination;
 
 /**
  * @author Anh Thu Vu
- *
+ * 
  */
 public class SimpleProcessingItemTest {
 
-	@Tested private SimpleProcessingItem pi;
-	
-	@Mocked private Processor processor;
-	@Mocked private SimpleStream stream;
-	@Mocked private StreamDestination destination;
-	@Mocked private ContentEvent event;
-	
-	private final int parallelism = 4;
-	private final int counter = 2;
-	
-	
-	@Before
-	public void setUp() throws Exception {
-		pi = new SimpleProcessingItem(processor, parallelism);
-	}
-
-	@Test
-	public void testConstructor() {
-		assertSame("Processor was not set correctly.",processor,pi.getProcessor());
-		assertEquals("Parallelism was not set correctly.",parallelism,pi.getParallelism(),0);
-	}
-	
-	@Test
-	public void testConnectInputShuffleStream() {
-		new Expectations() {
-			{
-				destination = new StreamDestination(pi, parallelism, PartitioningScheme.SHUFFLE);
-				stream.addDestination(destination);
-			}
-		};
-		pi.connectInputShuffleStream(stream);
-	}
-	
-	@Test
-	public void testConnectInputKeyStream() {
-		new Expectations() {
-			{
-				destination = new StreamDestination(pi, parallelism, PartitioningScheme.GROUP_BY_KEY);
-				stream.addDestination(destination);
-			}
-		};
-		pi.connectInputKeyStream(stream);
-	}
-	
-	@Test
-	public void testConnectInputAllStream() {
-		new Expectations() {
-			{
-				destination = new StreamDestination(pi, parallelism, PartitioningScheme.BROADCAST);
-				stream.addDestination(destination);
-			}
-		};
-		pi.connectInputAllStream(stream);
-	}
-	
-	@Test
-	public void testProcessEvent() {
-		new Expectations() {
-			{
-				for (int i=0; i<parallelism; i++) {
-					processor.newProcessor(processor);
-					result=processor;
-				
-					processor.onCreate(anyInt);
-				}
-				
-				processor.process(event);
-			}
-		};
-		pi.processEvent(event, counter);
-		
-	}
+  @Tested
+  private SimpleProcessingItem pi;
+
+  @Mocked
+  private Processor processor;
+  @Mocked
+  private SimpleStream stream;
+  @Mocked
+  private StreamDestination destination;
+  @Mocked
+  private ContentEvent event;
+
+  private final int parallelism = 4;
+  private final int counter = 2;
+
+  @Before
+  public void setUp() throws Exception {
+    pi = new SimpleProcessingItem(processor, parallelism);
+  }
+
+  @Test
+  public void testConstructor() {
+    assertSame("Processor was not set correctly.", processor, pi.getProcessor());
+    assertEquals("Parallelism was not set correctly.", parallelism, pi.getParallelism(), 0);
+  }
+
+  @Test
+  public void testConnectInputShuffleStream() {
+    new Expectations() {
+      {
+        destination = new StreamDestination(pi, parallelism, PartitioningScheme.SHUFFLE);
+        stream.addDestination(destination);
+      }
+    };
+    pi.connectInputShuffleStream(stream);
+  }
+
+  @Test
+  public void testConnectInputKeyStream() {
+    new Expectations() {
+      {
+        destination = new StreamDestination(pi, parallelism, PartitioningScheme.GROUP_BY_KEY);
+        stream.addDestination(destination);
+      }
+    };
+    pi.connectInputKeyStream(stream);
+  }
+
+  @Test
+  public void testConnectInputAllStream() {
+    new Expectations() {
+      {
+        destination = new StreamDestination(pi, parallelism, PartitioningScheme.BROADCAST);
+        stream.addDestination(destination);
+      }
+    };
+    pi.connectInputAllStream(stream);
+  }
+
+  @Test
+  public void testProcessEvent() {
+    new Expectations() {
+      {
+        for (int i = 0; i < parallelism; i++) {
+          processor.newProcessor(processor);
+          result = processor;
+
+          processor.onCreate(anyInt);
+        }
+
+        processor.process(event);
+      }
+    };
+    pi.processEvent(event, counter);
+
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleStreamTest.java
----------------------------------------------------------------------
diff --git a/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleStreamTest.java b/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleStreamTest.java
index 2a625b5..c8f6c5d 100644
--- a/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleStreamTest.java
+++ b/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleStreamTest.java
@@ -40,72 +40,82 @@ import com.yahoo.labs.samoa.utils.StreamDestination;
 
 /**
  * @author Anh Thu Vu
- *
+ * 
  */
 @RunWith(Parameterized.class)
 public class SimpleStreamTest {
 
-	@Tested private SimpleStream stream;
-	
-	@Mocked private SimpleProcessingItem sourcePi, destPi;
-	@Mocked private ContentEvent event;
-	@Mocked private StreamDestination destination;
+  @Tested
+  private SimpleStream stream;
+
+  @Mocked
+  private SimpleProcessingItem sourcePi, destPi;
+  @Mocked
+  private ContentEvent event;
+  @Mocked
+  private StreamDestination destination;
+
+  private final String eventKey = "eventkey";
+  private final int parallelism;
+  private final PartitioningScheme scheme;
+
+  @Parameters
+  public static Collection<Object[]> generateParameters() {
+    return Arrays.asList(new Object[][] {
+        { 2, PartitioningScheme.SHUFFLE },
+        { 3, PartitioningScheme.GROUP_BY_KEY },
+        { 4, PartitioningScheme.BROADCAST }
+    });
+  }
+
+  public SimpleStreamTest(int parallelism, PartitioningScheme scheme) {
+    this.parallelism = parallelism;
+    this.scheme = scheme;
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    stream = new SimpleStream(sourcePi);
+    stream.addDestination(destination);
+  }
 
-	private final String eventKey = "eventkey";
-	private final int parallelism;
-	private final PartitioningScheme scheme;
-	
-	
-	@Parameters
-	public static Collection<Object[]> generateParameters() {
-		return Arrays.asList(new Object[][] {
-			 { 2, PartitioningScheme.SHUFFLE },
-			 { 3, PartitioningScheme.GROUP_BY_KEY },
-			 { 4, PartitioningScheme.BROADCAST }
-		});
-	}
-	
-	public SimpleStreamTest(int parallelism, PartitioningScheme scheme) {
-		this.parallelism = parallelism;
-		this.scheme = scheme;
-	}
-	
-	@Before
-	public void setUp() throws Exception {
-		stream = new SimpleStream(sourcePi);
-		stream.addDestination(destination);
-	}
+  @Test
+  public void testPut() {
+    new NonStrictExpectations() {
+      {
+        event.getKey();
+        result = eventKey;
+        destination.getProcessingItem();
+        result = destPi;
+        destination.getPartitioningScheme();
+        result = scheme;
+        destination.getParallelism();
+        result = parallelism;
 
-	@Test
-	public void testPut() {
-		new NonStrictExpectations() {
-			{
-				event.getKey(); result=eventKey;
-				destination.getProcessingItem(); result=destPi;
-				destination.getPartitioningScheme(); result=scheme;
-				destination.getParallelism(); result=parallelism;
-				
-			}
-		};
-		switch(this.scheme) {
-		case SHUFFLE: case GROUP_BY_KEY:
-			new Expectations() {
-				{
-					// TODO: restrict the range of counter value
-					destPi.processEvent(event, anyInt); times=1;
-				}
-			};
-			break;
-		case BROADCAST:
-			new Expectations() {
-				{
-					// TODO: restrict the range of counter value
-					destPi.processEvent(event, anyInt); times=parallelism;
-				}
-			};
-			break;
-		}
-		stream.put(event);
-	}
+      }
+    };
+    switch (this.scheme) {
+    case SHUFFLE:
+    case GROUP_BY_KEY:
+      new Expectations() {
+        {
+          // TODO: restrict the range of counter value
+          destPi.processEvent(event, anyInt);
+          times = 1;
+        }
+      };
+      break;
+    case BROADCAST:
+      new Expectations() {
+        {
+          // TODO: restrict the range of counter value
+          destPi.processEvent(event, anyInt);
+          times = parallelism;
+        }
+      };
+      break;
+    }
+    stream.put(event);
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleTopologyTest.java
----------------------------------------------------------------------
diff --git a/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleTopologyTest.java b/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleTopologyTest.java
index 2423778..418ad14 100644
--- a/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleTopologyTest.java
+++ b/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleTopologyTest.java
@@ -31,63 +31,64 @@ import org.junit.Test;
  * #L%
  */
 
-
-
 import com.yahoo.labs.samoa.core.EntranceProcessor;
 import com.yahoo.labs.samoa.topology.EntranceProcessingItem;
 
 /**
  * @author Anh Thu Vu
- *
+ * 
  */
 public class SimpleTopologyTest {
 
-	@Tested private SimpleTopology topology;
-	
-	@Mocked private SimpleEntranceProcessingItem entrancePi;
-	@Mocked private EntranceProcessor entranceProcessor;
-	
-	@Before
-	public void setUp() throws Exception {
-		topology = new SimpleTopology("TestTopology");
-	}
-
-	@Test
-	public void testAddEntrancePi() {
-		topology.addEntranceProcessingItem(entrancePi);
-		
-		Set<EntranceProcessingItem> entrancePIs = topology.getEntranceProcessingItems();
-		assertNotNull("Set of entrance PIs is null.",entrancePIs);
-		assertEquals("Number of entrance PI in SimpleTopology must be 1",1,entrancePIs.size());
-		assertSame("Entrance PI was not set correctly.",entrancePi,entrancePIs.toArray()[0]);
-		// TODO: verify that entrance PI is in the set of ProcessingItems
-		// Need to access topology's set of PIs (getProcessingItems() method)
-	}
-	
-	@Test
-	public void testRun() {
-		topology.addEntranceProcessingItem(entrancePi);
-		
-		new NonStrictExpectations() {
-			{
-				entrancePi.getProcessor();
-				result=entranceProcessor;
-				
-			}
-		};
-		
-		new Expectations() {
-			{
-				entranceProcessor.onCreate(anyInt);	
-				entrancePi.startSendingEvents();
-			}
-		};
-		topology.run();
-	}
-	
-	@Test(expected=IllegalStateException.class)
-	public void testRunWithoutEntrancePI() {
-		topology.run();
-	}
+  @Tested
+  private SimpleTopology topology;
+
+  @Mocked
+  private SimpleEntranceProcessingItem entrancePi;
+  @Mocked
+  private EntranceProcessor entranceProcessor;
+
+  @Before
+  public void setUp() throws Exception {
+    topology = new SimpleTopology("TestTopology");
+  }
+
+  @Test
+  public void testAddEntrancePi() {
+    topology.addEntranceProcessingItem(entrancePi);
+
+    Set<EntranceProcessingItem> entrancePIs = topology.getEntranceProcessingItems();
+    assertNotNull("Set of entrance PIs is null.", entrancePIs);
+    assertEquals("Number of entrance PI in SimpleTopology must be 1", 1, entrancePIs.size());
+    assertSame("Entrance PI was not set correctly.", entrancePi, entrancePIs.toArray()[0]);
+    // TODO: verify that entrance PI is in the set of ProcessingItems
+    // Need to access topology's set of PIs (getProcessingItems() method)
+  }
+
+  @Test
+  public void testRun() {
+    topology.addEntranceProcessingItem(entrancePi);
+
+    new NonStrictExpectations() {
+      {
+        entrancePi.getProcessor();
+        result = entranceProcessor;
+
+      }
+    };
+
+    new Expectations() {
+      {
+        entranceProcessor.onCreate(anyInt);
+        entrancePi.startSendingEvents();
+      }
+    };
+    topology.run();
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testRunWithoutEntrancePI() {
+    topology.run();
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4ComponentFactory.java
----------------------------------------------------------------------
diff --git a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4ComponentFactory.java b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4ComponentFactory.java
index 33299ac..b627416 100644
--- a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4ComponentFactory.java
+++ b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4ComponentFactory.java
@@ -40,58 +40,59 @@ import com.yahoo.labs.samoa.topology.Topology;
  */
 public class S4ComponentFactory implements ComponentFactory {
 
-    public static final Logger logger = LoggerFactory.getLogger(S4ComponentFactory.class);
-    protected S4DoTask app;
+  public static final Logger logger = LoggerFactory.getLogger(S4ComponentFactory.class);
+  protected S4DoTask app;
 
-    @Override
-    public ProcessingItem createPi(Processor processor, int paralellism) {
-        S4ProcessingItem processingItem = new S4ProcessingItem(app);
-        // TODO refactor how to set the paralellism level
-        processingItem.setParalellismLevel(paralellism);
-        processingItem.setProcessor(processor);
+  @Override
+  public ProcessingItem createPi(Processor processor, int paralellism) {
+    S4ProcessingItem processingItem = new S4ProcessingItem(app);
+    // TODO refactor how to set the paralellism level
+    processingItem.setParalellismLevel(paralellism);
+    processingItem.setProcessor(processor);
 
-        return processingItem;
-    }
+    return processingItem;
+  }
 
-    @Override
-    public ProcessingItem createPi(Processor processor) {
-        return this.createPi(processor, 1);
-    }
+  @Override
+  public ProcessingItem createPi(Processor processor) {
+    return this.createPi(processor, 1);
+  }
 
-    @Override
-    public EntranceProcessingItem createEntrancePi(EntranceProcessor entranceProcessor) {
-        // TODO Create source Entry processing item that connects to an external stream
-        S4EntranceProcessingItem entrancePi = new S4EntranceProcessingItem(entranceProcessor, app);
-        entrancePi.setParallelism(1); // FIXME should not be set to 1 statically
-        return entrancePi;
-    }
+  @Override
+  public EntranceProcessingItem createEntrancePi(EntranceProcessor entranceProcessor) {
+    // TODO Create source Entry processing item that connects to an external
+    // stream
+    S4EntranceProcessingItem entrancePi = new S4EntranceProcessingItem(entranceProcessor, app);
+    entrancePi.setParallelism(1); // FIXME should not be set to 1 statically
+    return entrancePi;
+  }
 
-    @Override
-    public Stream createStream(IProcessingItem sourcePi) {
-        S4Stream aStream = new S4Stream(app);
-        return aStream;
-    }
+  @Override
+  public Stream createStream(IProcessingItem sourcePi) {
+    S4Stream aStream = new S4Stream(app);
+    return aStream;
+  }
 
-    @Override
-    public Topology createTopology(String topoName) {
-        return new S4Topology(topoName);
-    }
+  @Override
+  public Topology createTopology(String topoName) {
+    return new S4Topology(topoName);
+  }
 
-    /**
-     * Initialization method.
-     * 
-     * @param evalTask
-     */
-    public void init(String evalTask) {
-        // Task is initiated in the DoTaskApp
-    }
+  /**
+   * Initialization method.
+   * 
+   * @param evalTask
+   */
+  public void init(String evalTask) {
+    // Task is initiated in the DoTaskApp
+  }
 
-    /**
-     * Sets S4 application.
-     * 
-     * @param app
-     */
-    public void setApp(S4DoTask app) {
-        this.app = app;
-    }
+  /**
+   * Sets S4 application.
+   * 
+   * @param app
+   */
+  public void setApp(S4DoTask app) {
+    this.app = app;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4DoTask.java
----------------------------------------------------------------------
diff --git a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4DoTask.java b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4DoTask.java
index 0f474a4..3691a82 100644
--- a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4DoTask.java
+++ b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4DoTask.java
@@ -56,208 +56,213 @@ import com.google.inject.name.Named;
  */
 final public class S4DoTask extends App {
 
-    private final Logger logger = LoggerFactory.getLogger(S4DoTask.class);
-    Task task;
+  private final Logger logger = LoggerFactory.getLogger(S4DoTask.class);
+  Task task;
 
-    @Inject @Named("evalTask") public String evalTask;
+  @Inject
+  @Named("evalTask")
+  public String evalTask;
 
-    public S4DoTask() {
-        super();
-    }
-
-    /** The engine. */
-    protected ComponentFactory componentFactory;
-
-    /**
-     * Gets the factory.
-     * 
-     * @return the factory
-     */
-    public ComponentFactory getFactory() {
-        return componentFactory;
-    }
+  public S4DoTask() {
+    super();
+  }
 
-    /**
-     * Sets the factory.
-     * 
-     * @param factory
-     *            the new factory
-     */
-    public void setFactory(ComponentFactory factory) {
-        this.componentFactory = factory;
-    }
+  /** The engine. */
+  protected ComponentFactory componentFactory;
 
-    /*
-     * Build the application
-     * 
-     * @see org.apache.s4.core.App#onInit()
-     */
-    /*
-     * (non-Javadoc)
-     * 
-     * @see org.apache.s4.core.App#onInit()
-     */
-    @Override
-    protected void onInit() {
-        logger.info("DoTaskApp onInit");
-        // ConsoleReporters prints S4 metrics
-        // MetricsRegistry mr = new MetricsRegistry();
-        //
-        // CsvReporter.enable(new File(System.getProperty("user.home")
-        // + "/monitor/"), 10, TimeUnit.SECONDS);
-        // ConsoleReporter.enable(10, TimeUnit.SECONDS);
-        try {
-            System.err.println();
-            System.err.println(Globals.getWorkbenchInfoString());
-            System.err.println();
+  /**
+   * Gets the factory.
+   * 
+   * @return the factory
+   */
+  public ComponentFactory getFactory() {
+    return componentFactory;
+  }
 
-        } catch (Exception ex) {
-            ex.printStackTrace();
-        }
-        S4ComponentFactory factory = new S4ComponentFactory();
-        factory.setApp(this);
+  /**
+   * Sets the factory.
+   * 
+   * @param factory
+   *          the new factory
+   */
+  public void setFactory(ComponentFactory factory) {
+    this.componentFactory = factory;
+  }
 
-        // logger.debug("LC {}", lc);
+  /*
+   * Build the application
+   * 
+   * @see org.apache.s4.core.App#onInit()
+   */
+  /*
+   * (non-Javadoc)
+   * 
+   * @see org.apache.s4.core.App#onInit()
+   */
+  @Override
+  protected void onInit() {
+    logger.info("DoTaskApp onInit");
+    // ConsoleReporters prints S4 metrics
+    // MetricsRegistry mr = new MetricsRegistry();
+    //
+    // CsvReporter.enable(new File(System.getProperty("user.home")
+    // + "/monitor/"), 10, TimeUnit.SECONDS);
+    // ConsoleReporter.enable(10, TimeUnit.SECONDS);
+    try {
+      System.err.println();
+      System.err.println(Globals.getWorkbenchInfoString());
+      System.err.println();
 
-        // task = TaskProvider.getTask(evalTask);
+    } catch (Exception ex) {
+      ex.printStackTrace();
+    }
+    S4ComponentFactory factory = new S4ComponentFactory();
+    factory.setApp(this);
 
-        // EXAMPLE OPTIONS
-        // -l Clustream -g Clustream -i 100000 -s (RandomRBFGeneratorEvents -K
-        // 5 -N 0.0)
-        // String[] args = new String[] {evalTask,"-l", "Clustream","-g",
-        // "Clustream", "-i", "100000", "-s", "(RamdomRBFGeneratorsEvents",
-        // "-K", "5", "-N", "0.0)"};
-        // String[] args = new String[] { evalTask, "-l", "clustream.Clustream",
-        // "-g", "clustream.Clustream", "-i", "100000", "-s",
-        // "(RandomRBFGeneratorEvents", "-K", "5", "-N", "0.0)" };
-        logger.debug("PARAMETERS {}", evalTask);
-        // params = params.replace(":", " ");
-        List<String> parameters = new ArrayList<String>();
-        // parameters.add(evalTask);
-        try {
-            parameters.addAll(Arrays.asList(URLDecoder.decode(evalTask, "UTF-8").split(" ")));
-        } catch (UnsupportedEncodingException ex) {
-            ex.printStackTrace();
-        }
-        String[] args = parameters.toArray(new String[0]);
-        Option[] extraOptions = new Option[] {};
-        // build a single string by concatenating cli options
-        StringBuilder cliString = new StringBuilder();
-        for (int i = 0; i < args.length; i++) {
-            cliString.append(" ").append(args[i]);
-        }
+    // logger.debug("LC {}", lc);
 
-        // parse options
-        try {
-            task = (Task) ClassOption.cliStringToObject(cliString.toString(), Task.class, extraOptions);
-            task.setFactory(factory);
-            task.init();
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
+    // task = TaskProvider.getTask(evalTask);
 
+    // EXAMPLE OPTIONS
+    // -l Clustream -g Clustream -i 100000 -s (RandomRBFGeneratorEvents -K
+    // 5 -N 0.0)
+    // String[] args = new String[] {evalTask,"-l", "Clustream","-g",
+    // "Clustream", "-i", "100000", "-s", "(RamdomRBFGeneratorsEvents",
+    // "-K", "5", "-N", "0.0)"};
+    // String[] args = new String[] { evalTask, "-l", "clustream.Clustream",
+    // "-g", "clustream.Clustream", "-i", "100000", "-s",
+    // "(RandomRBFGeneratorEvents", "-K", "5", "-N", "0.0)" };
+    logger.debug("PARAMETERS {}", evalTask);
+    // params = params.replace(":", " ");
+    List<String> parameters = new ArrayList<String>();
+    // parameters.add(evalTask);
+    try {
+      parameters.addAll(Arrays.asList(URLDecoder.decode(evalTask, "UTF-8").split(" ")));
+    } catch (UnsupportedEncodingException ex) {
+      ex.printStackTrace();
+    }
+    String[] args = parameters.toArray(new String[0]);
+    Option[] extraOptions = new Option[] {};
+    // build a single string by concatenating cli options
+    StringBuilder cliString = new StringBuilder();
+    for (int i = 0; i < args.length; i++) {
+      cliString.append(" ").append(args[i]);
     }
 
-    /*
-     * (non-Javadoc)
-     * 
-     * @see org.apache.s4.core.App#onStart()
-     */
-    @Override
-    protected void onStart() {
-        logger.info("Starting DoTaskApp... App Partition [{}]", this.getPartitionId());
-        // <<<<<<< HEAD Task doesn't have start in latest storm-impl
-        // TODO change the way the app starts
-        // if (this.getPartitionId() == 0)
-        S4Topology s4topology = (S4Topology) getTask().getTopology();
-        S4EntranceProcessingItem epi = (S4EntranceProcessingItem) s4topology.getEntranceProcessingItem();
-        while (epi.injectNextEvent())
-            // inject events from the EntrancePI
-            ;
+    // parse options
+    try {
+      task = (Task) ClassOption.cliStringToObject(cliString.toString(), Task.class, extraOptions);
+      task.setFactory(factory);
+      task.init();
+    } catch (Exception e) {
+      e.printStackTrace();
     }
 
-    /*
-     * (non-Javadoc)
-     * 
-     * @see org.apache.s4.core.App#onClose()
-     */
-    @Override
-    protected void onClose() {
-        System.out.println("Closing DoTaskApp...");
+  }
 
-    }
+  /*
+   * (non-Javadoc)
+   * 
+   * @see org.apache.s4.core.App#onStart()
+   */
+  @Override
+  protected void onStart() {
+    logger.info("Starting DoTaskApp... App Partition [{}]", this.getPartitionId());
+    // <<<<<<< HEAD Task doesn't have start in latest storm-impl
+    // TODO change the way the app starts
+    // if (this.getPartitionId() == 0)
+    S4Topology s4topology = (S4Topology) getTask().getTopology();
+    S4EntranceProcessingItem epi = (S4EntranceProcessingItem) s4topology.getEntranceProcessingItem();
+    while (epi.injectNextEvent())
+      // inject events from the EntrancePI
+      ;
+  }
 
-    /**
-     * Gets the task.
-     * 
-     * @return the task
-     */
-    public Task getTask() {
-        return task;
-    }
+  /*
+   * (non-Javadoc)
+   * 
+   * @see org.apache.s4.core.App#onClose()
+   */
+  @Override
+  protected void onClose() {
+    System.out.println("Closing DoTaskApp...");
 
-    // These methods are protected in App and can not be accessed from outside.
-    // They are
-    // called from parallel classifiers and evaluations. Is there a better way
-    // to do that?
+  }
 
-    /*
-     * (non-Javadoc)
-     * 
-     * @see org.apache.s4.core.App#createPE(java.lang.Class)
-     */
-    @Override
-    public <T extends ProcessingElement> T createPE(Class<T> type) {
-        return super.createPE(type);
-    }
+  /**
+   * Gets the task.
+   * 
+   * @return the task
+   */
+  public Task getTask() {
+    return task;
+  }
 
-    /*
-     * (non-Javadoc)
-     * 
-     * @see org.apache.s4.core.App#createStream(java.lang.String, org.apache.s4.base.KeyFinder, org.apache.s4.core.ProcessingElement[])
-     */
-    @Override
-    public <T extends Event> Stream<T> createStream(String name, KeyFinder<T> finder, ProcessingElement... processingElements) {
-        return super.createStream(name, finder, processingElements);
-    }
+  // These methods are protected in App and can not be accessed from outside.
+  // They are
+  // called from parallel classifiers and evaluations. Is there a better way
+  // to do that?
 
-    /*
-     * (non-Javadoc)
-     * 
-     * @see org.apache.s4.core.App#createStream(java.lang.String, org.apache.s4.core.ProcessingElement[])
-     */
-    @Override
-    public <T extends Event> Stream<T> createStream(String name, ProcessingElement... processingElements) {
-        return super.createStream(name, processingElements);
-    }
+  /*
+   * (non-Javadoc)
+   * 
+   * @see org.apache.s4.core.App#createPE(java.lang.Class)
+   */
+  @Override
+  public <T extends ProcessingElement> T createPE(Class<T> type) {
+    return super.createPE(type);
+  }
 
-    // @com.beust.jcommander.Parameters(separators = "=")
-    // class Parameters {
-    //
-    // @Parameter(names={"-lc","-local"}, description="Local clustering method")
-    // private String localClustering;
-    //
-    // @Parameter(names={"-gc","-global"},
-    // description="Global clustering method")
-    // private String globalClustering;
-    //
-    // }
-    //
-    // class ParametersConverter {// implements IStringConverter<String[]> {
-    //
-    //
-    // public String[] convertToArgs(String value) {
-    //
-    // String[] params = value.split(",");
-    // String[] args = new String[params.length*2];
-    // for(int i=0; i<params.length ; i++) {
-    // args[i] = params[i].split("=")[0];
-    // args[i+1] = params[i].split("=")[1];
-    // i++;
-    // }
-    // return args;
-    // }
-    //
-    // }
+  /*
+   * (non-Javadoc)
+   * 
+   * @see org.apache.s4.core.App#createStream(java.lang.String,
+   * org.apache.s4.base.KeyFinder, org.apache.s4.core.ProcessingElement[])
+   */
+  @Override
+  public <T extends Event> Stream<T> createStream(String name, KeyFinder<T> finder,
+      ProcessingElement... processingElements) {
+    return super.createStream(name, finder, processingElements);
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see org.apache.s4.core.App#createStream(java.lang.String,
+   * org.apache.s4.core.ProcessingElement[])
+   */
+  @Override
+  public <T extends Event> Stream<T> createStream(String name, ProcessingElement... processingElements) {
+    return super.createStream(name, processingElements);
+  }
+
+  // @com.beust.jcommander.Parameters(separators = "=")
+  // class Parameters {
+  //
+  // @Parameter(names={"-lc","-local"}, description="Local clustering method")
+  // private String localClustering;
+  //
+  // @Parameter(names={"-gc","-global"},
+  // description="Global clustering method")
+  // private String globalClustering;
+  //
+  // }
+  //
+  // class ParametersConverter {// implements IStringConverter<String[]> {
+  //
+  //
+  // public String[] convertToArgs(String value) {
+  //
+  // String[] params = value.split(",");
+  // String[] args = new String[params.length*2];
+  // for(int i=0; i<params.length ; i++) {
+  // args[i] = params[i].split("=")[0];
+  // args[i+1] = params[i].split("=")[1];
+  // i++;
+  // }
+  // return args;
+  // }
+  //
+  // }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4EntranceProcessingItem.java
----------------------------------------------------------------------
diff --git a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4EntranceProcessingItem.java b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4EntranceProcessingItem.java
index 2b0c595..6f374fa 100644
--- a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4EntranceProcessingItem.java
+++ b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4EntranceProcessingItem.java
@@ -32,89 +32,89 @@ import com.yahoo.labs.samoa.topology.Stream;
 
 public class S4EntranceProcessingItem extends ProcessingElement implements EntranceProcessingItem {
 
-    private EntranceProcessor entranceProcessor;
-    // private S4DoTask app;
-    private int parallelism;
-    protected Stream outputStream;
-
-    /**
-     * Constructor of an S4 entrance processing item.
-     * 
-     * @param app
-     *            : S4 application
-     */
-    public S4EntranceProcessingItem(EntranceProcessor entranceProcessor, App app) {
-        super(app);
-        this.entranceProcessor = entranceProcessor;
-        // this.app = (S4DoTask) app;
-        // this.setSingleton(true);
-    }
-
-    public void setParallelism(int parallelism) {
-        this.parallelism = parallelism;
-    }
-
-    public int getParallelism() {
-        return this.parallelism;
-    }
-
-    @Override
-    public EntranceProcessor getProcessor() {
-        return this.entranceProcessor;
-    }
-
-    //
-    // @Override
-    // public void put(Instance inst) {
-    // // do nothing
-    // // may not needed
-    // }
-
-    @Override
-    protected void onCreate() {
-        // was commented
-        if (this.entranceProcessor != null) {
-            // TODO revisit if we need to change it to a clone() call
-            this.entranceProcessor = (EntranceProcessor) this.entranceProcessor.newProcessor(this.entranceProcessor);
-            this.entranceProcessor.onCreate(Integer.parseInt(getId()));
-        }
-    }
-
-    @Override
-    protected void onRemove() {
-        // do nothing
-    }
-
-    //
-    // /**
-    // * Sets the entrance processing item processor.
-    // *
-    // * @param processor
-    // */
-    // public void setProcessor(Processor processor) {
-    // this.entranceProcessor = processor;
-    // }
-
-    @Override
-    public void setName(String name) {
-        super.setName(name);
-    }
-
-    @Override
-    public EntranceProcessingItem setOutputStream(Stream stream) {
-        if (this.outputStream != null)
-            throw new IllegalStateException("Output stream for an EntrancePI sohuld be initialized only once");
-        this.outputStream = stream;
-        return this;
-    }
-
-    public boolean injectNextEvent() {
-        if (entranceProcessor.hasNext()) {
-            ContentEvent nextEvent = this.entranceProcessor.nextEvent();
-            outputStream.put(nextEvent);
-            return entranceProcessor.hasNext();
-        } else
-            return false;
-        // return !nextEvent.isLastEvent();
+  private EntranceProcessor entranceProcessor;
+  // private S4DoTask app;
+  private int parallelism;
+  protected Stream outputStream;
+
+  /**
+   * Constructor of an S4 entrance processing item.
+   * 
+   * @param app
+   *          : S4 application
+   */
+  public S4EntranceProcessingItem(EntranceProcessor entranceProcessor, App app) {
+    super(app);
+    this.entranceProcessor = entranceProcessor;
+    // this.app = (S4DoTask) app;
+    // this.setSingleton(true);
+  }
+
+  public void setParallelism(int parallelism) {
+    this.parallelism = parallelism;
+  }
+
+  public int getParallelism() {
+    return this.parallelism;
+  }
+
+  @Override
+  public EntranceProcessor getProcessor() {
+    return this.entranceProcessor;
+  }
+
+  //
+  // @Override
+  // public void put(Instance inst) {
+  // // do nothing
+  // // may not needed
+  // }
+
+  @Override
+  protected void onCreate() {
+    // was commented
+    if (this.entranceProcessor != null) {
+      // TODO revisit if we need to change it to a clone() call
+      this.entranceProcessor = (EntranceProcessor) this.entranceProcessor.newProcessor(this.entranceProcessor);
+      this.entranceProcessor.onCreate(Integer.parseInt(getId()));
     }
+  }
+
+  @Override
+  protected void onRemove() {
+    // do nothing
+  }
+
+  //
+  // /**
+  // * Sets the entrance processing item processor.
+  // *
+  // * @param processor
+  // */
+  // public void setProcessor(Processor processor) {
+  // this.entranceProcessor = processor;
+  // }
+
+  @Override
+  public void setName(String name) {
+    super.setName(name);
+  }
+
+  @Override
+  public EntranceProcessingItem setOutputStream(Stream stream) {
+    if (this.outputStream != null)
+      throw new IllegalStateException("Output stream for an EntrancePI sohuld be initialized only once");
+    this.outputStream = stream;
+    return this;
+  }
+
+  public boolean injectNextEvent() {
+    if (entranceProcessor.hasNext()) {
+      ContentEvent nextEvent = this.entranceProcessor.nextEvent();
+      outputStream.put(nextEvent);
+      return entranceProcessor.hasNext();
+    } else
+      return false;
+    // return !nextEvent.isLastEvent();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Event.java
----------------------------------------------------------------------
diff --git a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Event.java b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Event.java
index 8f8ad9f..62c623c 100644
--- a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Event.java
+++ b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Event.java
@@ -36,55 +36,57 @@ import com.yahoo.labs.samoa.core.ContentEvent;
 @Immutable
 final public class S4Event extends Event {
 
-	private String key;
-	
-	public String getKey() {
-		return key;
-	}
-
-	public void setKey(String key) {
-		this.key = key;
-	}
-
-	/** The content event. */
-	private ContentEvent contentEvent;
-	
-	/**
-	 * Instantiates a new instance event.
-	 */
-	public S4Event() {
-		// Needed for serialization of kryo
-	}
-
-	/**
-	 * Instantiates a new instance event.
-	 *
-	 * @param contentEvent the content event
-	 */
-	public S4Event(ContentEvent contentEvent) {
-		if (contentEvent != null) {
-			this.contentEvent = contentEvent;
-			this.key = contentEvent.getKey();
-			
-		}
-	}
-
-	/**
-	 * Gets the content event.
-	 *
-	 * @return the content event
-	 */
-	public ContentEvent getContentEvent() {
-		return contentEvent;
-	}
-
-	/**
-	 * Sets the content event.
-	 *
-	 * @param contentEvent the new content event
-	 */
-	public void setContentEvent(ContentEvent contentEvent) {
-		this.contentEvent = contentEvent;
-	}
+  private String key;
+
+  public String getKey() {
+    return key;
+  }
+
+  public void setKey(String key) {
+    this.key = key;
+  }
+
+  /** The content event. */
+  private ContentEvent contentEvent;
+
+  /**
+   * Instantiates a new instance event.
+   */
+  public S4Event() {
+    // Needed for serialization of kryo
+  }
+
+  /**
+   * Instantiates a new instance event.
+   * 
+   * @param contentEvent
+   *          the content event
+   */
+  public S4Event(ContentEvent contentEvent) {
+    if (contentEvent != null) {
+      this.contentEvent = contentEvent;
+      this.key = contentEvent.getKey();
+
+    }
+  }
+
+  /**
+   * Gets the content event.
+   * 
+   * @return the content event
+   */
+  public ContentEvent getContentEvent() {
+    return contentEvent;
+  }
+
+  /**
+   * Sets the content event.
+   * 
+   * @param contentEvent
+   *          the new content event
+   */
+  public void setContentEvent(ContentEvent contentEvent) {
+    this.contentEvent = contentEvent;
+  }
 
 }