You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samoa.apache.org by mm...@apache.org on 2015/02/04 16:00:22 UTC
[06/43] incubator-samoa git commit: Format java source files
according to the eclipse-format.xml standard - spaces,
no tabs - indent size = 2 spaces - line wrap at 120
http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-instances/src/main/java/com/yahoo/labs/samoa/instances/SparseInstanceData.java
----------------------------------------------------------------------
diff --git a/samoa-instances/src/main/java/com/yahoo/labs/samoa/instances/SparseInstanceData.java b/samoa-instances/src/main/java/com/yahoo/labs/samoa/instances/SparseInstanceData.java
index e917844..1db95d0 100644
--- a/samoa-instances/src/main/java/com/yahoo/labs/samoa/instances/SparseInstanceData.java
+++ b/samoa-instances/src/main/java/com/yahoo/labs/samoa/instances/SparseInstanceData.java
@@ -25,118 +25,118 @@ package com.yahoo.labs.samoa.instances;
*/
/**
- *
+ *
* @author abifet
*/
-public class SparseInstanceData implements InstanceData{
-
- public SparseInstanceData(double[] attributeValues, int[] indexValues, int numberAttributes) {
- this.attributeValues = attributeValues;
- this.indexValues = indexValues;
- this.numberAttributes = numberAttributes;
- }
-
- public SparseInstanceData(int length) {
- this.attributeValues = new double[length];
- this.indexValues = new int[length];
- }
-
-
- protected double[] attributeValues;
+public class SparseInstanceData implements InstanceData {
- public double[] getAttributeValues() {
- return attributeValues;
- }
+ public SparseInstanceData(double[] attributeValues, int[] indexValues, int numberAttributes) {
+ this.attributeValues = attributeValues;
+ this.indexValues = indexValues;
+ this.numberAttributes = numberAttributes;
+ }
- public void setAttributeValues(double[] attributeValues) {
- this.attributeValues = attributeValues;
- }
+ public SparseInstanceData(int length) {
+ this.attributeValues = new double[length];
+ this.indexValues = new int[length];
+ }
- public int[] getIndexValues() {
- return indexValues;
- }
+ protected double[] attributeValues;
- public void setIndexValues(int[] indexValues) {
- this.indexValues = indexValues;
- }
+ public double[] getAttributeValues() {
+ return attributeValues;
+ }
- public int getNumberAttributes() {
- return numberAttributes;
- }
+ public void setAttributeValues(double[] attributeValues) {
+ this.attributeValues = attributeValues;
+ }
- public void setNumberAttributes(int numberAttributes) {
- this.numberAttributes = numberAttributes;
- }
- protected int[] indexValues;
- protected int numberAttributes;
+ public int[] getIndexValues() {
+ return indexValues;
+ }
- @Override
- public int numAttributes() {
- return this.numberAttributes;
- }
+ public void setIndexValues(int[] indexValues) {
+ this.indexValues = indexValues;
+ }
+
+ public int getNumberAttributes() {
+ return numberAttributes;
+ }
+
+ public void setNumberAttributes(int numberAttributes) {
+ this.numberAttributes = numberAttributes;
+ }
+
+ protected int[] indexValues;
+ protected int numberAttributes;
- @Override
- public double value(int indexAttribute) {
- int location = locateIndex(indexAttribute);
- //return location == -1 ? 0 : this.attributeValues[location];
- // int index = locateIndex(attIndex);
+ @Override
+ public int numAttributes() {
+ return this.numberAttributes;
+ }
+
+ @Override
+ public double value(int indexAttribute) {
+ int location = locateIndex(indexAttribute);
+ // return location == -1 ? 0 : this.attributeValues[location];
+ // int index = locateIndex(attIndex);
if ((location >= 0) && (indexValues[location] == indexAttribute)) {
return attributeValues[location];
} else {
return 0.0;
}
- }
+ }
- @Override
- public boolean isMissing(int indexAttribute) {
- return Double.isNaN(this.value(indexAttribute));
- }
+ @Override
+ public boolean isMissing(int indexAttribute) {
+ return Double.isNaN(this.value(indexAttribute));
+ }
- @Override
- public int numValues() {
- return this.attributeValues.length;
- }
+ @Override
+ public int numValues() {
+ return this.attributeValues.length;
+ }
- @Override
- public int index(int indexAttribute) {
- return this.indexValues[indexAttribute];
- }
+ @Override
+ public int index(int indexAttribute) {
+ return this.indexValues[indexAttribute];
+ }
- @Override
- public double valueSparse(int indexAttribute) {
- return this.attributeValues[indexAttribute];
- }
+ @Override
+ public double valueSparse(int indexAttribute) {
+ return this.attributeValues[indexAttribute];
+ }
- @Override
- public boolean isMissingSparse(int indexAttribute) {
- return Double.isNaN(this.valueSparse(indexAttribute));
- }
+ @Override
+ public boolean isMissingSparse(int indexAttribute) {
+ return Double.isNaN(this.valueSparse(indexAttribute));
+ }
+
+ /*
+ * @Override public double value(Attribute attribute) { return
+ * value(attribute.index()); }
+ */
- /*@Override
- public double value(Attribute attribute) {
- return value(attribute.index());
- }*/
-
- @Override
- public double[] toDoubleArray() {
- double[] array = new double[numAttributes()];
- for (int i=0; i<numValues() ; i++) {
- array[index(i)] = valueSparse(i);
- }
- return array;
+ @Override
+ public double[] toDoubleArray() {
+ double[] array = new double[numAttributes()];
+ for (int i = 0; i < numValues(); i++) {
+ array[index(i)] = valueSparse(i);
}
+ return array;
+ }
- @Override
- public void setValue(int attributeIndex, double d) {
- int index = locateIndex(attributeIndex);
- if (index(index) == attributeIndex) {
- this.attributeValues[index] = d;
- } else {
- // We need to add the value
- }
+ @Override
+ public void setValue(int attributeIndex, double d) {
+ int index = locateIndex(attributeIndex);
+ if (index(index) == attributeIndex) {
+ this.attributeValues[index] = d;
+ } else {
+ // We need to add the value
}
-
- /**
+ }
+
+ /**
* Locates the greatest index that is not greater than the given index.
*
* @return the internal index of the attribute index. Returns -1 if no index
@@ -168,5 +168,5 @@ public class SparseInstanceData implements InstanceData{
return min - 1;
}
}
-
+
}
http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-instances/src/main/java/com/yahoo/labs/samoa/instances/Utils.java
----------------------------------------------------------------------
diff --git a/samoa-instances/src/main/java/com/yahoo/labs/samoa/instances/Utils.java b/samoa-instances/src/main/java/com/yahoo/labs/samoa/instances/Utils.java
index f3dc1b9..dd9df6d 100644
--- a/samoa-instances/src/main/java/com/yahoo/labs/samoa/instances/Utils.java
+++ b/samoa-instances/src/main/java/com/yahoo/labs/samoa/instances/Utils.java
@@ -21,68 +21,71 @@ package com.yahoo.labs.samoa.instances;
*/
public class Utils {
- public static int maxIndex(double[] doubles) {
+ public static int maxIndex(double[] doubles) {
- double maximum = 0;
- int maxIndex = 0;
+ double maximum = 0;
+ int maxIndex = 0;
- for (int i = 0; i < doubles.length; i++) {
- if ((i == 0) || (doubles[i] > maximum)) {
- maxIndex = i;
- maximum = doubles[i];
- }
- }
-
- return maxIndex;
+ for (int i = 0; i < doubles.length; i++) {
+ if ((i == 0) || (doubles[i] > maximum)) {
+ maxIndex = i;
+ maximum = doubles[i];
+ }
}
- public static String quote(String string) {
- boolean quote = false;
+ return maxIndex;
+ }
- // backquote the following characters
- if ((string.indexOf('\n') != -1) || (string.indexOf('\r') != -1) || (string.indexOf('\'') != -1) || (string.indexOf('"') != -1)
- || (string.indexOf('\\') != -1) || (string.indexOf('\t') != -1) || (string.indexOf('%') != -1) || (string.indexOf('\u001E') != -1)) {
- string = backQuoteChars(string);
- quote = true;
- }
+ public static String quote(String string) {
+ boolean quote = false;
- // Enclose the string in 's if the string contains a recently added
- // backquote or contains one of the following characters.
- if ((quote == true) || (string.indexOf('{') != -1) || (string.indexOf('}') != -1) || (string.indexOf(',') != -1) || (string.equals("?"))
- || (string.indexOf(' ') != -1) || (string.equals(""))) {
- string = ("'".concat(string)).concat("'");
- }
+ // backquote the following characters
+ if ((string.indexOf('\n') != -1) || (string.indexOf('\r') != -1) || (string.indexOf('\'') != -1)
+ || (string.indexOf('"') != -1)
+ || (string.indexOf('\\') != -1) || (string.indexOf('\t') != -1) || (string.indexOf('%') != -1)
+ || (string.indexOf('\u001E') != -1)) {
+ string = backQuoteChars(string);
+ quote = true;
+ }
- return string;
+ // Enclose the string in 's if the string contains a recently added
+ // backquote or contains one of the following characters.
+ if ((quote == true) || (string.indexOf('{') != -1) || (string.indexOf('}') != -1) || (string.indexOf(',') != -1)
+ || (string.equals("?"))
+ || (string.indexOf(' ') != -1) || (string.equals(""))) {
+ string = ("'".concat(string)).concat("'");
}
- public static String backQuoteChars(String string) {
+ return string;
+ }
- int index;
- StringBuffer newStringBuffer;
+ public static String backQuoteChars(String string) {
- // replace each of the following characters with the backquoted version
- char charsFind[] = { '\\', '\'', '\t', '\n', '\r', '"', '%', '\u001E' };
- String charsReplace[] = { "\\\\", "\\'", "\\t", "\\n", "\\r", "\\\"", "\\%", "\\u001E" };
- for (int i = 0; i < charsFind.length; i++) {
- if (string.indexOf(charsFind[i]) != -1) {
- newStringBuffer = new StringBuffer();
- while ((index = string.indexOf(charsFind[i])) != -1) {
- if (index > 0) {
- newStringBuffer.append(string.substring(0, index));
- }
- newStringBuffer.append(charsReplace[i]);
- if ((index + 1) < string.length()) {
- string = string.substring(index + 1);
- } else {
- string = "";
- }
- }
- newStringBuffer.append(string);
- string = newStringBuffer.toString();
- }
- }
+ int index;
+ StringBuffer newStringBuffer;
- return string;
+ // replace each of the following characters with the backquoted version
+ char charsFind[] = { '\\', '\'', '\t', '\n', '\r', '"', '%', '\u001E' };
+ String charsReplace[] = { "\\\\", "\\'", "\\t", "\\n", "\\r", "\\\"", "\\%", "\\u001E" };
+ for (int i = 0; i < charsFind.length; i++) {
+ if (string.indexOf(charsFind[i]) != -1) {
+ newStringBuffer = new StringBuffer();
+ while ((index = string.indexOf(charsFind[i])) != -1) {
+ if (index > 0) {
+ newStringBuffer.append(string.substring(0, index));
+ }
+ newStringBuffer.append(charsReplace[i]);
+ if ((index + 1) < string.length()) {
+ string = string.substring(index + 1);
+ } else {
+ string = "";
+ }
+ }
+ newStringBuffer.append(string);
+ string = newStringBuffer.toString();
+ }
}
+
+ return string;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-local/src/main/java/com/yahoo/labs/samoa/LocalDoTask.java
----------------------------------------------------------------------
diff --git a/samoa-local/src/main/java/com/yahoo/labs/samoa/LocalDoTask.java b/samoa-local/src/main/java/com/yahoo/labs/samoa/LocalDoTask.java
index 05ee1e1..27ca7a1 100644
--- a/samoa-local/src/main/java/com/yahoo/labs/samoa/LocalDoTask.java
+++ b/samoa-local/src/main/java/com/yahoo/labs/samoa/LocalDoTask.java
@@ -36,54 +36,55 @@ import com.yahoo.labs.samoa.topology.impl.SimpleEngine;
*/
public class LocalDoTask {
- // TODO: clean up this class for helping ML Developer in SAMOA
- // TODO: clean up code from storm-impl
-
- // It seems that the 3 extra options are not used.
- // Probably should remove them
- private static final String SUPPRESS_STATUS_OUT_MSG = "Suppress the task status output. Normally it is sent to stderr.";
- private static final String SUPPRESS_RESULT_OUT_MSG = "Suppress the task result output. Normally it is sent to stdout.";
- private static final String STATUS_UPDATE_FREQ_MSG = "Wait time in milliseconds between status updates.";
- private static final Logger logger = LoggerFactory.getLogger(LocalDoTask.class);
+ // TODO: clean up this class for helping ML Developer in SAMOA
+ // TODO: clean up code from storm-impl
- /**
- * The main method.
- *
- * @param args
- * the arguments
- */
- public static void main(String[] args) {
+ // It seems that the 3 extra options are not used.
+ // Probably should remove them
+ private static final String SUPPRESS_STATUS_OUT_MSG = "Suppress the task status output. Normally it is sent to stderr.";
+ private static final String SUPPRESS_RESULT_OUT_MSG = "Suppress the task result output. Normally it is sent to stdout.";
+ private static final String STATUS_UPDATE_FREQ_MSG = "Wait time in milliseconds between status updates.";
+ private static final Logger logger = LoggerFactory.getLogger(LocalDoTask.class);
- // ArrayList<String> tmpArgs = new ArrayList<String>(Arrays.asList(args));
+ /**
+ * The main method.
+ *
+ * @param args
+ * the arguments
+ */
+ public static void main(String[] args) {
- // args = tmpArgs.toArray(new String[0]);
+ // ArrayList<String> tmpArgs = new ArrayList<String>(Arrays.asList(args));
- FlagOption suppressStatusOutOpt = new FlagOption("suppressStatusOut", 'S', SUPPRESS_STATUS_OUT_MSG);
+ // args = tmpArgs.toArray(new String[0]);
- FlagOption suppressResultOutOpt = new FlagOption("suppressResultOut", 'R', SUPPRESS_RESULT_OUT_MSG);
+ FlagOption suppressStatusOutOpt = new FlagOption("suppressStatusOut", 'S', SUPPRESS_STATUS_OUT_MSG);
- IntOption statusUpdateFreqOpt = new IntOption("statusUpdateFrequency", 'F', STATUS_UPDATE_FREQ_MSG, 1000, 0, Integer.MAX_VALUE);
+ FlagOption suppressResultOutOpt = new FlagOption("suppressResultOut", 'R', SUPPRESS_RESULT_OUT_MSG);
- Option[] extraOptions = new Option[] { suppressStatusOutOpt, suppressResultOutOpt, statusUpdateFreqOpt };
+ IntOption statusUpdateFreqOpt = new IntOption("statusUpdateFrequency", 'F', STATUS_UPDATE_FREQ_MSG, 1000, 0,
+ Integer.MAX_VALUE);
- StringBuilder cliString = new StringBuilder();
- for (String arg : args) {
- cliString.append(" ").append(arg);
- }
- logger.debug("Command line string = {}", cliString.toString());
- System.out.println("Command line string = " + cliString.toString());
+ Option[] extraOptions = new Option[] { suppressStatusOutOpt, suppressResultOutOpt, statusUpdateFreqOpt };
- Task task;
- try {
- task = ClassOption.cliStringToObject(cliString.toString(), Task.class, extraOptions);
- logger.info("Successfully instantiating {}", task.getClass().getCanonicalName());
- } catch (Exception e) {
- logger.error("Fail to initialize the task", e);
- System.out.println("Fail to initialize the task" + e);
- return;
- }
- task.setFactory(new SimpleComponentFactory());
- task.init();
- SimpleEngine.submitTopology(task.getTopology());
+ StringBuilder cliString = new StringBuilder();
+ for (String arg : args) {
+ cliString.append(" ").append(arg);
}
+ logger.debug("Command line string = {}", cliString.toString());
+ System.out.println("Command line string = " + cliString.toString());
+
+ Task task;
+ try {
+ task = ClassOption.cliStringToObject(cliString.toString(), Task.class, extraOptions);
+ logger.info("Successfully instantiating {}", task.getClass().getCanonicalName());
+ } catch (Exception e) {
+ logger.error("Fail to initialize the task", e);
+ System.out.println("Fail to initialize the task" + e);
+ return;
+ }
+ task.setFactory(new SimpleComponentFactory());
+ task.init();
+ SimpleEngine.submitTopology(task.getTopology());
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleComponentFactory.java
----------------------------------------------------------------------
diff --git a/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleComponentFactory.java b/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleComponentFactory.java
index b289dbe..0c2f301 100644
--- a/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleComponentFactory.java
+++ b/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleComponentFactory.java
@@ -31,23 +31,23 @@ import com.yahoo.labs.samoa.topology.Topology;
public class SimpleComponentFactory implements ComponentFactory {
- public ProcessingItem createPi(Processor processor, int paralellism) {
- return new SimpleProcessingItem(processor, paralellism);
- }
+ public ProcessingItem createPi(Processor processor, int paralellism) {
+ return new SimpleProcessingItem(processor, paralellism);
+ }
- public ProcessingItem createPi(Processor processor) {
- return this.createPi(processor, 1);
- }
+ public ProcessingItem createPi(Processor processor) {
+ return this.createPi(processor, 1);
+ }
- public EntranceProcessingItem createEntrancePi(EntranceProcessor processor) {
- return new SimpleEntranceProcessingItem(processor);
- }
+ public EntranceProcessingItem createEntrancePi(EntranceProcessor processor) {
+ return new SimpleEntranceProcessingItem(processor);
+ }
- public Stream createStream(IProcessingItem sourcePi) {
- return new SimpleStream(sourcePi);
- }
+ public Stream createStream(IProcessingItem sourcePi) {
+ return new SimpleStream(sourcePi);
+ }
- public Topology createTopology(String topoName) {
- return new SimpleTopology(topoName);
- }
+ public Topology createTopology(String topoName) {
+ return new SimpleTopology(topoName);
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleEngine.java
----------------------------------------------------------------------
diff --git a/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleEngine.java b/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleEngine.java
index 9d131e1..5ca5837 100644
--- a/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleEngine.java
+++ b/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleEngine.java
@@ -28,10 +28,10 @@ import com.yahoo.labs.samoa.topology.Topology;
public class SimpleEngine {
- public static void submitTopology(Topology topology) {
- SimpleTopology simpleTopology = (SimpleTopology) topology;
- simpleTopology.run();
- // runs until completion
- }
+ public static void submitTopology(Topology topology) {
+ SimpleTopology simpleTopology = (SimpleTopology) topology;
+ simpleTopology.run();
+ // runs until completion
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleEntranceProcessingItem.java
----------------------------------------------------------------------
diff --git a/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleEntranceProcessingItem.java b/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleEntranceProcessingItem.java
index 4652ebb..c9cc601 100644
--- a/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleEntranceProcessingItem.java
+++ b/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleEntranceProcessingItem.java
@@ -24,10 +24,10 @@ import com.yahoo.labs.samoa.core.EntranceProcessor;
import com.yahoo.labs.samoa.topology.LocalEntranceProcessingItem;
class SimpleEntranceProcessingItem extends LocalEntranceProcessingItem {
- public SimpleEntranceProcessingItem(EntranceProcessor processor) {
- super(processor);
- }
-
- // The default waiting time when there is no available events is 100ms
- // Override waitForNewEvents() to change it
+ public SimpleEntranceProcessingItem(EntranceProcessor processor) {
+ super(processor);
+ }
+
+ // The default waiting time when there is no available events is 100ms
+ // Override waitForNewEvents() to change it
}
http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleProcessingItem.java
----------------------------------------------------------------------
diff --git a/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleProcessingItem.java b/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleProcessingItem.java
index e3cc765..77361b1 100644
--- a/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleProcessingItem.java
+++ b/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleProcessingItem.java
@@ -34,54 +34,54 @@ import com.yahoo.labs.samoa.utils.PartitioningScheme;
import com.yahoo.labs.samoa.utils.StreamDestination;
/**
- *
+ *
* @author abifet
*/
class SimpleProcessingItem extends AbstractProcessingItem {
- private IProcessingItem[] arrayProcessingItem;
+ private IProcessingItem[] arrayProcessingItem;
- SimpleProcessingItem(Processor processor) {
- super(processor);
- }
-
- SimpleProcessingItem(Processor processor, int parallelism) {
- super(processor);
- this.setParallelism(parallelism);
- }
-
- public IProcessingItem getProcessingItem(int i) {
- return arrayProcessingItem[i];
- }
-
- @Override
- protected ProcessingItem addInputStream(Stream inputStream, PartitioningScheme scheme) {
- StreamDestination destination = new StreamDestination(this, this.getParallelism(), scheme);
- ((SimpleStream)inputStream).addDestination(destination);
- return this;
- }
+ SimpleProcessingItem(Processor processor) {
+ super(processor);
+ }
- public SimpleProcessingItem copy() {
- Processor processor = this.getProcessor();
- return new SimpleProcessingItem(processor.newProcessor(processor));
- }
+ SimpleProcessingItem(Processor processor, int parallelism) {
+ super(processor);
+ this.setParallelism(parallelism);
+ }
- public void processEvent(ContentEvent event, int counter) {
-
- int parallelism = this.getParallelism();
- //System.out.println("Process event "+event+" (isLast="+event.isLastEvent()+") with counter="+counter+" while parallelism="+parallelism);
- if (this.arrayProcessingItem == null && parallelism > 0) {
- //Init processing elements, the first time they are needed
- this.arrayProcessingItem = new IProcessingItem[parallelism];
- for (int j = 0; j < parallelism; j++) {
- arrayProcessingItem[j] = this.copy();
- arrayProcessingItem[j].getProcessor().onCreate(j);
- }
- }
- if (this.arrayProcessingItem != null) {
- IProcessingItem pi = this.getProcessingItem(counter);
- Processor p = pi.getProcessor();
- //System.out.println("PI="+pi+", p="+p);
- this.getProcessingItem(counter).getProcessor().process(event);
- }
+ public IProcessingItem getProcessingItem(int i) {
+ return arrayProcessingItem[i];
+ }
+
+ @Override
+ protected ProcessingItem addInputStream(Stream inputStream, PartitioningScheme scheme) {
+ StreamDestination destination = new StreamDestination(this, this.getParallelism(), scheme);
+ ((SimpleStream) inputStream).addDestination(destination);
+ return this;
+ }
+
+ public SimpleProcessingItem copy() {
+ Processor processor = this.getProcessor();
+ return new SimpleProcessingItem(processor.newProcessor(processor));
+ }
+
+ public void processEvent(ContentEvent event, int counter) {
+
+ int parallelism = this.getParallelism();
+ // System.out.println("Process event "+event+" (isLast="+event.isLastEvent()+") with counter="+counter+" while parallelism="+parallelism);
+ if (this.arrayProcessingItem == null && parallelism > 0) {
+ // Init processing elements, the first time they are needed
+ this.arrayProcessingItem = new IProcessingItem[parallelism];
+ for (int j = 0; j < parallelism; j++) {
+ arrayProcessingItem[j] = this.copy();
+ arrayProcessingItem[j].getProcessor().onCreate(j);
+ }
+ }
+ if (this.arrayProcessingItem != null) {
+ IProcessingItem pi = this.getProcessingItem(counter);
+ Processor p = pi.getProcessor();
+ // System.out.println("PI="+pi+", p="+p);
+ this.getProcessingItem(counter).getProcessor().process(event);
}
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleStream.java
----------------------------------------------------------------------
diff --git a/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleStream.java b/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleStream.java
index 74684a7..09dc555 100644
--- a/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleStream.java
+++ b/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleStream.java
@@ -38,56 +38,58 @@ import com.yahoo.labs.samoa.utils.StreamDestination;
* @author abifet
*/
class SimpleStream extends AbstractStream {
- private List<StreamDestination> destinations;
- private int maxCounter;
- private int eventCounter;
+ private List<StreamDestination> destinations;
+ private int maxCounter;
+ private int eventCounter;
- SimpleStream(IProcessingItem sourcePi) {
- super(sourcePi);
- this.destinations = new LinkedList<>();
- this.eventCounter = 0;
- this.maxCounter = 1;
- }
+ SimpleStream(IProcessingItem sourcePi) {
+ super(sourcePi);
+ this.destinations = new LinkedList<>();
+ this.eventCounter = 0;
+ this.maxCounter = 1;
+ }
- private int getNextCounter() {
- if (maxCounter > 0 && eventCounter >= maxCounter) eventCounter = 0;
- this.eventCounter++;
- return this.eventCounter;
- }
+ private int getNextCounter() {
+ if (maxCounter > 0 && eventCounter >= maxCounter)
+ eventCounter = 0;
+ this.eventCounter++;
+ return this.eventCounter;
+ }
- @Override
- public void put(ContentEvent event) {
- this.put(event, this.getNextCounter());
- }
-
- private void put(ContentEvent event, int counter) {
- SimpleProcessingItem pi;
- int parallelism;
- for (StreamDestination destination:destinations) {
- pi = (SimpleProcessingItem) destination.getProcessingItem();
- parallelism = destination.getParallelism();
- switch (destination.getPartitioningScheme()) {
- case SHUFFLE:
- pi.processEvent(event, counter % parallelism);
- break;
- case GROUP_BY_KEY:
- HashCodeBuilder hb = new HashCodeBuilder();
- hb.append(event.getKey());
- int key = hb.build() % parallelism;
- pi.processEvent(event, key);
- break;
- case BROADCAST:
- for (int p = 0; p < parallelism; p++) {
- pi.processEvent(event, p);
- }
- break;
- }
+ @Override
+ public void put(ContentEvent event) {
+ this.put(event, this.getNextCounter());
+ }
+
+ private void put(ContentEvent event, int counter) {
+ SimpleProcessingItem pi;
+ int parallelism;
+ for (StreamDestination destination : destinations) {
+ pi = (SimpleProcessingItem) destination.getProcessingItem();
+ parallelism = destination.getParallelism();
+ switch (destination.getPartitioningScheme()) {
+ case SHUFFLE:
+ pi.processEvent(event, counter % parallelism);
+ break;
+ case GROUP_BY_KEY:
+ HashCodeBuilder hb = new HashCodeBuilder();
+ hb.append(event.getKey());
+ int key = hb.build() % parallelism;
+ pi.processEvent(event, key);
+ break;
+ case BROADCAST:
+ for (int p = 0; p < parallelism; p++) {
+ pi.processEvent(event, p);
}
+ break;
+ }
}
+ }
- public void addDestination(StreamDestination destination) {
- this.destinations.add(destination);
- if (maxCounter <= 0) maxCounter = 1;
- maxCounter *= destination.getParallelism();
- }
+ public void addDestination(StreamDestination destination) {
+ this.destinations.add(destination);
+ if (maxCounter <= 0)
+ maxCounter = 1;
+ maxCounter *= destination.getParallelism();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleTopology.java
----------------------------------------------------------------------
diff --git a/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleTopology.java b/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleTopology.java
index 675b4ac..e7fddbd 100644
--- a/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleTopology.java
+++ b/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleTopology.java
@@ -27,18 +27,21 @@ package com.yahoo.labs.samoa.topology.impl;
import com.yahoo.labs.samoa.topology.AbstractTopology;
public class SimpleTopology extends AbstractTopology {
- SimpleTopology(String name) {
- super(name);
- }
+ SimpleTopology(String name) {
+ super(name);
+ }
- public void run() {
- if (this.getEntranceProcessingItems() == null)
- throw new IllegalStateException("You need to set entrance PI before running the topology.");
- if (this.getEntranceProcessingItems().size() != 1)
- throw new IllegalStateException("SimpleTopology supports 1 entrance PI only. Number of entrance PIs is "+this.getEntranceProcessingItems().size());
-
- SimpleEntranceProcessingItem entrancePi = (SimpleEntranceProcessingItem) this.getEntranceProcessingItems().toArray()[0];
- entrancePi.getProcessor().onCreate(0); // id=0 as it is not used in simple mode
- entrancePi.startSendingEvents();
- }
+ public void run() {
+ if (this.getEntranceProcessingItems() == null)
+ throw new IllegalStateException("You need to set entrance PI before running the topology.");
+ if (this.getEntranceProcessingItems().size() != 1)
+ throw new IllegalStateException("SimpleTopology supports 1 entrance PI only. Number of entrance PIs is "
+ + this.getEntranceProcessingItems().size());
+
+ SimpleEntranceProcessingItem entrancePi = (SimpleEntranceProcessingItem) this.getEntranceProcessingItems()
+ .toArray()[0];
+ entrancePi.getProcessor().onCreate(0); // id=0 as it is not used in simple
+ // mode
+ entrancePi.startSendingEvents();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-local/src/test/java/com/yahoo/labs/samoa/AlgosTest.java
----------------------------------------------------------------------
diff --git a/samoa-local/src/test/java/com/yahoo/labs/samoa/AlgosTest.java b/samoa-local/src/test/java/com/yahoo/labs/samoa/AlgosTest.java
index 9bf1c2d..d3e54a8 100644
--- a/samoa-local/src/test/java/com/yahoo/labs/samoa/AlgosTest.java
+++ b/samoa-local/src/test/java/com/yahoo/labs/samoa/AlgosTest.java
@@ -24,64 +24,63 @@ import org.junit.Test;
public class AlgosTest {
+ @Test
+ public void testVHTLocal() throws Exception {
- @Test
- public void testVHTLocal() throws Exception {
+ TestParams vhtConfig = new TestParams.Builder()
+ .inputInstances(200_000)
+ .samplingSize(20_000)
+ .evaluationInstances(200_000)
+ .classifiedInstances(200_000)
+ .classificationsCorrect(75f)
+ .kappaStat(0f)
+ .kappaTempStat(0f)
+ .cliStringTemplate(TestParams.Templates.PREQEVAL_VHT_RANDOMTREE)
+ .resultFilePollTimeout(10)
+ .prePollWait(10)
+ .taskClassName(LocalDoTask.class.getName())
+ .build();
+ TestUtils.test(vhtConfig);
- TestParams vhtConfig = new TestParams.Builder()
- .inputInstances(200_000)
- .samplingSize(20_000)
- .evaluationInstances(200_000)
- .classifiedInstances(200_000)
- .classificationsCorrect(75f)
- .kappaStat(0f)
- .kappaTempStat(0f)
- .cliStringTemplate(TestParams.Templates.PREQEVAL_VHT_RANDOMTREE)
- .resultFilePollTimeout(10)
- .prePollWait(10)
- .taskClassName(LocalDoTask.class.getName())
- .build();
- TestUtils.test(vhtConfig);
+ }
- }
+ @Test
+ public void testBaggingLocal() throws Exception {
+ TestParams baggingConfig = new TestParams.Builder()
+ .inputInstances(200_000)
+ .samplingSize(20_000)
+ .evaluationInstances(180_000)
+ .classifiedInstances(210_000)
+ .classificationsCorrect(60f)
+ .kappaStat(0f)
+ .kappaTempStat(0f)
+ .cliStringTemplate(TestParams.Templates.PREQEVAL_BAGGING_RANDOMTREE)
+ .prePollWait(10)
+ .resultFilePollTimeout(10)
+ .taskClassName(LocalDoTask.class.getName())
+ .build();
+ TestUtils.test(baggingConfig);
- @Test
- public void testBaggingLocal() throws Exception {
- TestParams baggingConfig = new TestParams.Builder()
- .inputInstances(200_000)
- .samplingSize(20_000)
- .evaluationInstances(180_000)
- .classifiedInstances(210_000)
- .classificationsCorrect(60f)
- .kappaStat(0f)
- .kappaTempStat(0f)
- .cliStringTemplate(TestParams.Templates.PREQEVAL_BAGGING_RANDOMTREE)
- .prePollWait(10)
- .resultFilePollTimeout(10)
- .taskClassName(LocalDoTask.class.getName())
- .build();
- TestUtils.test(baggingConfig);
+ }
- }
+ @Test
+ public void testNaiveBayesLocal() throws Exception {
- @Test
- public void testNaiveBayesLocal() throws Exception {
+ TestParams vhtConfig = new TestParams.Builder()
+ .inputInstances(200_000)
+ .samplingSize(20_000)
+ .evaluationInstances(200_000)
+ .classifiedInstances(200_000)
+ .classificationsCorrect(65f)
+ .kappaStat(0f)
+ .kappaTempStat(0f)
+ .cliStringTemplate(TestParams.Templates.PREQEVAL_NAIVEBAYES_HYPERPLANE)
+ .resultFilePollTimeout(10)
+ .prePollWait(10)
+ .taskClassName(LocalDoTask.class.getName())
+ .build();
+ TestUtils.test(vhtConfig);
- TestParams vhtConfig = new TestParams.Builder()
- .inputInstances(200_000)
- .samplingSize(20_000)
- .evaluationInstances(200_000)
- .classifiedInstances(200_000)
- .classificationsCorrect(65f)
- .kappaStat(0f)
- .kappaTempStat(0f)
- .cliStringTemplate(TestParams.Templates.PREQEVAL_NAIVEBAYES_HYPERPLANE)
- .resultFilePollTimeout(10)
- .prePollWait(10)
- .taskClassName(LocalDoTask.class.getName())
- .build();
- TestUtils.test(vhtConfig);
-
- }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleComponentFactoryTest.java
----------------------------------------------------------------------
diff --git a/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleComponentFactoryTest.java b/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleComponentFactoryTest.java
index 02a9295..bfd6fe1 100644
--- a/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleComponentFactoryTest.java
+++ b/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleComponentFactoryTest.java
@@ -36,61 +36,64 @@ import com.yahoo.labs.samoa.topology.Topology;
/**
* @author Anh Thu Vu
- *
+ *
*/
public class SimpleComponentFactoryTest {
- @Tested private SimpleComponentFactory factory;
- @Mocked private Processor processor, processorReplica;
- @Mocked private EntranceProcessor entranceProcessor;
-
- private final int parallelism = 3;
- private final String topoName = "TestTopology";
-
+ @Tested
+ private SimpleComponentFactory factory;
+ @Mocked
+ private Processor processor, processorReplica;
+ @Mocked
+ private EntranceProcessor entranceProcessor;
+
+ private final int parallelism = 3;
+ private final String topoName = "TestTopology";
+
+ @Before
+ public void setUp() throws Exception {
+ factory = new SimpleComponentFactory();
+ }
+
+ @Test
+ public void testCreatePiNoParallelism() {
+ ProcessingItem pi = factory.createPi(processor);
+ assertNotNull("ProcessingItem created is null.", pi);
+ assertEquals("ProcessingItem created is not a SimpleProcessingItem.", SimpleProcessingItem.class, pi.getClass());
+ assertEquals("Parallelism of PI is not 1", 1, pi.getParallelism(), 0);
+ }
+
+ @Test
+ public void testCreatePiWithParallelism() {
+ ProcessingItem pi = factory.createPi(processor, parallelism);
+ assertNotNull("ProcessingItem created is null.", pi);
+ assertEquals("ProcessingItem created is not a SimpleProcessingItem.", SimpleProcessingItem.class, pi.getClass());
+ assertEquals("Parallelism of PI is not ", parallelism, pi.getParallelism(), 0);
+ }
+
+ @Test
+ public void testCreateStream() {
+ ProcessingItem pi = factory.createPi(processor);
+
+ Stream stream = factory.createStream(pi);
+ assertNotNull("Stream created is null", stream);
+ assertEquals("Stream created is not a SimpleStream.", SimpleStream.class, stream.getClass());
+ }
- @Before
- public void setUp() throws Exception {
- factory = new SimpleComponentFactory();
- }
+ @Test
+ public void testCreateTopology() {
+ Topology topology = factory.createTopology(topoName);
+ assertNotNull("Topology created is null.", topology);
+ assertEquals("Topology created is not a SimpleTopology.", SimpleTopology.class, topology.getClass());
+ }
- @Test
- public void testCreatePiNoParallelism() {
- ProcessingItem pi = factory.createPi(processor);
- assertNotNull("ProcessingItem created is null.",pi);
- assertEquals("ProcessingItem created is not a SimpleProcessingItem.",SimpleProcessingItem.class,pi.getClass());
- assertEquals("Parallelism of PI is not 1",1,pi.getParallelism(),0);
- }
-
- @Test
- public void testCreatePiWithParallelism() {
- ProcessingItem pi = factory.createPi(processor,parallelism);
- assertNotNull("ProcessingItem created is null.",pi);
- assertEquals("ProcessingItem created is not a SimpleProcessingItem.",SimpleProcessingItem.class,pi.getClass());
- assertEquals("Parallelism of PI is not ",parallelism,pi.getParallelism(),0);
- }
-
- @Test
- public void testCreateStream() {
- ProcessingItem pi = factory.createPi(processor);
-
- Stream stream = factory.createStream(pi);
- assertNotNull("Stream created is null",stream);
- assertEquals("Stream created is not a SimpleStream.",SimpleStream.class,stream.getClass());
- }
-
- @Test
- public void testCreateTopology() {
- Topology topology = factory.createTopology(topoName);
- assertNotNull("Topology created is null.",topology);
- assertEquals("Topology created is not a SimpleTopology.",SimpleTopology.class,topology.getClass());
- }
-
- @Test
- public void testCreateEntrancePi() {
- EntranceProcessingItem entrancePi = factory.createEntrancePi(entranceProcessor);
- assertNotNull("EntranceProcessingItem created is null.",entrancePi);
- assertEquals("EntranceProcessingItem created is not a SimpleEntranceProcessingItem.",SimpleEntranceProcessingItem.class,entrancePi.getClass());
- assertSame("EntranceProcessor is not set correctly.",entranceProcessor, entrancePi.getProcessor());
- }
+ @Test
+ public void testCreateEntrancePi() {
+ EntranceProcessingItem entrancePi = factory.createEntrancePi(entranceProcessor);
+ assertNotNull("EntranceProcessingItem created is null.", entrancePi);
+ assertEquals("EntranceProcessingItem created is not a SimpleEntranceProcessingItem.",
+ SimpleEntranceProcessingItem.class, entrancePi.getClass());
+ assertSame("EntranceProcessor is not set correctly.", entranceProcessor, entrancePi.getProcessor());
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleEngineTest.java
----------------------------------------------------------------------
diff --git a/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleEngineTest.java b/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleEngineTest.java
index c4649ed..23b38b4 100644
--- a/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleEngineTest.java
+++ b/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleEngineTest.java
@@ -29,29 +29,32 @@ import org.junit.Test;
/**
* @author Anh Thu Vu
- *
+ *
*/
public class SimpleEngineTest {
- @Tested private SimpleEngine unused;
- @Mocked private SimpleTopology topology;
- @Mocked private Runtime mockedRuntime;
-
- @Test
- public void testSubmitTopology() {
- new NonStrictExpectations() {
- {
- Runtime.getRuntime();
- result=mockedRuntime;
- mockedRuntime.exit(0);
- }
- };
- SimpleEngine.submitTopology(topology);
- new Verifications() {
- {
- topology.run();
- }
- };
- }
+ @Tested
+ private SimpleEngine unused;
+ @Mocked
+ private SimpleTopology topology;
+ @Mocked
+ private Runtime mockedRuntime;
+
+ @Test
+ public void testSubmitTopology() {
+ new NonStrictExpectations() {
+ {
+ Runtime.getRuntime();
+ result = mockedRuntime;
+ mockedRuntime.exit(0);
+ }
+ };
+ SimpleEngine.submitTopology(topology);
+ new Verifications() {
+ {
+ topology.run();
+ }
+ };
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleEntranceProcessingItemTest.java
----------------------------------------------------------------------
diff --git a/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleEntranceProcessingItemTest.java b/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleEntranceProcessingItemTest.java
index 41ae22b..0c1e475 100644
--- a/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleEntranceProcessingItemTest.java
+++ b/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleEntranceProcessingItemTest.java
@@ -36,118 +36,137 @@ import com.yahoo.labs.samoa.topology.Stream;
/**
* @author Anh Thu Vu
- *
+ *
*/
public class SimpleEntranceProcessingItemTest {
- @Tested private SimpleEntranceProcessingItem entrancePi;
-
- @Mocked private EntranceProcessor entranceProcessor;
- @Mocked private Stream outputStream, anotherStream;
- @Mocked private ContentEvent event;
-
- @Mocked private Thread unused;
-
- /**
- * @throws java.lang.Exception
- */
- @Before
- public void setUp() throws Exception {
- entrancePi = new SimpleEntranceProcessingItem(entranceProcessor);
- }
-
- @Test
- public void testContructor() {
- assertSame("EntranceProcessor is not set correctly.",entranceProcessor,entrancePi.getProcessor());
- }
-
- @Test
- public void testSetOutputStream() {
- entrancePi.setOutputStream(outputStream);
- assertSame("OutputStream is not set correctly.",outputStream,entrancePi.getOutputStream());
- }
-
- @Test
- public void testSetOutputStreamRepeate() {
- entrancePi.setOutputStream(outputStream);
- entrancePi.setOutputStream(outputStream);
- assertSame("OutputStream is not set correctly.",outputStream,entrancePi.getOutputStream());
- }
-
- @Test(expected=IllegalStateException.class)
- public void testSetOutputStreamError() {
- entrancePi.setOutputStream(outputStream);
- entrancePi.setOutputStream(anotherStream);
- }
-
- @Test
- public void testInjectNextEventSuccess() {
- entrancePi.setOutputStream(outputStream);
- new StrictExpectations() {
- {
- entranceProcessor.hasNext();
- result=true;
-
- entranceProcessor.nextEvent();
- result=event;
- }
- };
- entrancePi.injectNextEvent();
- new Verifications() {
- {
- outputStream.put(event);
- }
- };
- }
-
- @Test
- public void testStartSendingEvents() {
- entrancePi.setOutputStream(outputStream);
- new StrictExpectations() {
- {
- for (int i=0; i<1; i++) {
- entranceProcessor.isFinished(); result=false;
- entranceProcessor.hasNext(); result=false;
- }
-
- for (int i=0; i<5; i++) {
- entranceProcessor.isFinished(); result=false;
- entranceProcessor.hasNext(); result=true;
- entranceProcessor.nextEvent(); result=event;
- outputStream.put(event);
- }
-
- for (int i=0; i<2; i++) {
- entranceProcessor.isFinished(); result=false;
- entranceProcessor.hasNext(); result=false;
- }
-
- for (int i=0; i<5; i++) {
- entranceProcessor.isFinished(); result=false;
- entranceProcessor.hasNext(); result=true;
- entranceProcessor.nextEvent(); result=event;
- outputStream.put(event);
- }
-
- entranceProcessor.isFinished(); result=true; times=1;
- entranceProcessor.hasNext(); times=0;
- }
- };
- entrancePi.startSendingEvents();
- new Verifications() {
- {
- try {
- Thread.sleep(anyInt); times=3;
- } catch (InterruptedException e) {
-
- }
- }
- };
- }
-
- @Test(expected=IllegalStateException.class)
- public void testStartSendingEventsError() {
- entrancePi.startSendingEvents();
- }
+ @Tested
+ private SimpleEntranceProcessingItem entrancePi;
+
+ @Mocked
+ private EntranceProcessor entranceProcessor;
+ @Mocked
+ private Stream outputStream, anotherStream;
+ @Mocked
+ private ContentEvent event;
+
+ @Mocked
+ private Thread unused;
+
+ /**
+ * @throws java.lang.Exception
+ */
+ @Before
+ public void setUp() throws Exception {
+ entrancePi = new SimpleEntranceProcessingItem(entranceProcessor);
+ }
+
+ @Test
+ public void testContructor() {
+ assertSame("EntranceProcessor is not set correctly.", entranceProcessor, entrancePi.getProcessor());
+ }
+
+ @Test
+ public void testSetOutputStream() {
+ entrancePi.setOutputStream(outputStream);
+ assertSame("OutputStream is not set correctly.", outputStream, entrancePi.getOutputStream());
+ }
+
+ @Test
+ public void testSetOutputStreamRepeate() {
+ entrancePi.setOutputStream(outputStream);
+ entrancePi.setOutputStream(outputStream);
+ assertSame("OutputStream is not set correctly.", outputStream, entrancePi.getOutputStream());
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testSetOutputStreamError() {
+ entrancePi.setOutputStream(outputStream);
+ entrancePi.setOutputStream(anotherStream);
+ }
+
+ @Test
+ public void testInjectNextEventSuccess() {
+ entrancePi.setOutputStream(outputStream);
+ new StrictExpectations() {
+ {
+ entranceProcessor.hasNext();
+ result = true;
+
+ entranceProcessor.nextEvent();
+ result = event;
+ }
+ };
+ entrancePi.injectNextEvent();
+ new Verifications() {
+ {
+ outputStream.put(event);
+ }
+ };
+ }
+
+ @Test
+ public void testStartSendingEvents() {
+ entrancePi.setOutputStream(outputStream);
+ new StrictExpectations() {
+ {
+ for (int i = 0; i < 1; i++) {
+ entranceProcessor.isFinished();
+ result = false;
+ entranceProcessor.hasNext();
+ result = false;
+ }
+
+ for (int i = 0; i < 5; i++) {
+ entranceProcessor.isFinished();
+ result = false;
+ entranceProcessor.hasNext();
+ result = true;
+ entranceProcessor.nextEvent();
+ result = event;
+ outputStream.put(event);
+ }
+
+ for (int i = 0; i < 2; i++) {
+ entranceProcessor.isFinished();
+ result = false;
+ entranceProcessor.hasNext();
+ result = false;
+ }
+
+ for (int i = 0; i < 5; i++) {
+ entranceProcessor.isFinished();
+ result = false;
+ entranceProcessor.hasNext();
+ result = true;
+ entranceProcessor.nextEvent();
+ result = event;
+ outputStream.put(event);
+ }
+
+ entranceProcessor.isFinished();
+ result = true;
+ times = 1;
+ entranceProcessor.hasNext();
+ times = 0;
+ }
+ };
+ entrancePi.startSendingEvents();
+ new Verifications() {
+ {
+ try {
+ Thread.sleep(anyInt);
+ times = 3;
+ } catch (InterruptedException e) {
+
+ }
+ }
+ };
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testStartSendingEventsError() {
+ entrancePi.startSendingEvents();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleProcessingItemTest.java
----------------------------------------------------------------------
diff --git a/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleProcessingItemTest.java b/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleProcessingItemTest.java
index a4a288a..caa82bf 100644
--- a/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleProcessingItemTest.java
+++ b/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleProcessingItemTest.java
@@ -40,81 +40,85 @@ import com.yahoo.labs.samoa.utils.StreamDestination;
/**
* @author Anh Thu Vu
- *
+ *
*/
public class SimpleProcessingItemTest {
- @Tested private SimpleProcessingItem pi;
-
- @Mocked private Processor processor;
- @Mocked private SimpleStream stream;
- @Mocked private StreamDestination destination;
- @Mocked private ContentEvent event;
-
- private final int parallelism = 4;
- private final int counter = 2;
-
-
- @Before
- public void setUp() throws Exception {
- pi = new SimpleProcessingItem(processor, parallelism);
- }
-
- @Test
- public void testConstructor() {
- assertSame("Processor was not set correctly.",processor,pi.getProcessor());
- assertEquals("Parallelism was not set correctly.",parallelism,pi.getParallelism(),0);
- }
-
- @Test
- public void testConnectInputShuffleStream() {
- new Expectations() {
- {
- destination = new StreamDestination(pi, parallelism, PartitioningScheme.SHUFFLE);
- stream.addDestination(destination);
- }
- };
- pi.connectInputShuffleStream(stream);
- }
-
- @Test
- public void testConnectInputKeyStream() {
- new Expectations() {
- {
- destination = new StreamDestination(pi, parallelism, PartitioningScheme.GROUP_BY_KEY);
- stream.addDestination(destination);
- }
- };
- pi.connectInputKeyStream(stream);
- }
-
- @Test
- public void testConnectInputAllStream() {
- new Expectations() {
- {
- destination = new StreamDestination(pi, parallelism, PartitioningScheme.BROADCAST);
- stream.addDestination(destination);
- }
- };
- pi.connectInputAllStream(stream);
- }
-
- @Test
- public void testProcessEvent() {
- new Expectations() {
- {
- for (int i=0; i<parallelism; i++) {
- processor.newProcessor(processor);
- result=processor;
-
- processor.onCreate(anyInt);
- }
-
- processor.process(event);
- }
- };
- pi.processEvent(event, counter);
-
- }
+ @Tested
+ private SimpleProcessingItem pi;
+
+ @Mocked
+ private Processor processor;
+ @Mocked
+ private SimpleStream stream;
+ @Mocked
+ private StreamDestination destination;
+ @Mocked
+ private ContentEvent event;
+
+ private final int parallelism = 4;
+ private final int counter = 2;
+
+ @Before
+ public void setUp() throws Exception {
+ pi = new SimpleProcessingItem(processor, parallelism);
+ }
+
+ @Test
+ public void testConstructor() {
+ assertSame("Processor was not set correctly.", processor, pi.getProcessor());
+ assertEquals("Parallelism was not set correctly.", parallelism, pi.getParallelism(), 0);
+ }
+
+ @Test
+ public void testConnectInputShuffleStream() {
+ new Expectations() {
+ {
+ destination = new StreamDestination(pi, parallelism, PartitioningScheme.SHUFFLE);
+ stream.addDestination(destination);
+ }
+ };
+ pi.connectInputShuffleStream(stream);
+ }
+
+ @Test
+ public void testConnectInputKeyStream() {
+ new Expectations() {
+ {
+ destination = new StreamDestination(pi, parallelism, PartitioningScheme.GROUP_BY_KEY);
+ stream.addDestination(destination);
+ }
+ };
+ pi.connectInputKeyStream(stream);
+ }
+
+ @Test
+ public void testConnectInputAllStream() {
+ new Expectations() {
+ {
+ destination = new StreamDestination(pi, parallelism, PartitioningScheme.BROADCAST);
+ stream.addDestination(destination);
+ }
+ };
+ pi.connectInputAllStream(stream);
+ }
+
+ @Test
+ public void testProcessEvent() {
+ new Expectations() {
+ {
+ for (int i = 0; i < parallelism; i++) {
+ processor.newProcessor(processor);
+ result = processor;
+
+ processor.onCreate(anyInt);
+ }
+
+ processor.process(event);
+ }
+ };
+ pi.processEvent(event, counter);
+
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleStreamTest.java
----------------------------------------------------------------------
diff --git a/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleStreamTest.java b/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleStreamTest.java
index 2a625b5..c8f6c5d 100644
--- a/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleStreamTest.java
+++ b/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleStreamTest.java
@@ -40,72 +40,82 @@ import com.yahoo.labs.samoa.utils.StreamDestination;
/**
* @author Anh Thu Vu
- *
+ *
*/
@RunWith(Parameterized.class)
public class SimpleStreamTest {
- @Tested private SimpleStream stream;
-
- @Mocked private SimpleProcessingItem sourcePi, destPi;
- @Mocked private ContentEvent event;
- @Mocked private StreamDestination destination;
+ @Tested
+ private SimpleStream stream;
+
+ @Mocked
+ private SimpleProcessingItem sourcePi, destPi;
+ @Mocked
+ private ContentEvent event;
+ @Mocked
+ private StreamDestination destination;
+
+ private final String eventKey = "eventkey";
+ private final int parallelism;
+ private final PartitioningScheme scheme;
+
+ @Parameters
+ public static Collection<Object[]> generateParameters() {
+ return Arrays.asList(new Object[][] {
+ { 2, PartitioningScheme.SHUFFLE },
+ { 3, PartitioningScheme.GROUP_BY_KEY },
+ { 4, PartitioningScheme.BROADCAST }
+ });
+ }
+
+ public SimpleStreamTest(int parallelism, PartitioningScheme scheme) {
+ this.parallelism = parallelism;
+ this.scheme = scheme;
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ stream = new SimpleStream(sourcePi);
+ stream.addDestination(destination);
+ }
- private final String eventKey = "eventkey";
- private final int parallelism;
- private final PartitioningScheme scheme;
-
-
- @Parameters
- public static Collection<Object[]> generateParameters() {
- return Arrays.asList(new Object[][] {
- { 2, PartitioningScheme.SHUFFLE },
- { 3, PartitioningScheme.GROUP_BY_KEY },
- { 4, PartitioningScheme.BROADCAST }
- });
- }
-
- public SimpleStreamTest(int parallelism, PartitioningScheme scheme) {
- this.parallelism = parallelism;
- this.scheme = scheme;
- }
-
- @Before
- public void setUp() throws Exception {
- stream = new SimpleStream(sourcePi);
- stream.addDestination(destination);
- }
+ @Test
+ public void testPut() {
+ new NonStrictExpectations() {
+ {
+ event.getKey();
+ result = eventKey;
+ destination.getProcessingItem();
+ result = destPi;
+ destination.getPartitioningScheme();
+ result = scheme;
+ destination.getParallelism();
+ result = parallelism;
- @Test
- public void testPut() {
- new NonStrictExpectations() {
- {
- event.getKey(); result=eventKey;
- destination.getProcessingItem(); result=destPi;
- destination.getPartitioningScheme(); result=scheme;
- destination.getParallelism(); result=parallelism;
-
- }
- };
- switch(this.scheme) {
- case SHUFFLE: case GROUP_BY_KEY:
- new Expectations() {
- {
- // TODO: restrict the range of counter value
- destPi.processEvent(event, anyInt); times=1;
- }
- };
- break;
- case BROADCAST:
- new Expectations() {
- {
- // TODO: restrict the range of counter value
- destPi.processEvent(event, anyInt); times=parallelism;
- }
- };
- break;
- }
- stream.put(event);
- }
+ }
+ };
+ switch (this.scheme) {
+ case SHUFFLE:
+ case GROUP_BY_KEY:
+ new Expectations() {
+ {
+ // TODO: restrict the range of counter value
+ destPi.processEvent(event, anyInt);
+ times = 1;
+ }
+ };
+ break;
+ case BROADCAST:
+ new Expectations() {
+ {
+ // TODO: restrict the range of counter value
+ destPi.processEvent(event, anyInt);
+ times = parallelism;
+ }
+ };
+ break;
+ }
+ stream.put(event);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleTopologyTest.java
----------------------------------------------------------------------
diff --git a/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleTopologyTest.java b/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleTopologyTest.java
index 2423778..418ad14 100644
--- a/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleTopologyTest.java
+++ b/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleTopologyTest.java
@@ -31,63 +31,64 @@ import org.junit.Test;
* #L%
*/
-
-
import com.yahoo.labs.samoa.core.EntranceProcessor;
import com.yahoo.labs.samoa.topology.EntranceProcessingItem;
/**
* @author Anh Thu Vu
- *
+ *
*/
public class SimpleTopologyTest {
- @Tested private SimpleTopology topology;
-
- @Mocked private SimpleEntranceProcessingItem entrancePi;
- @Mocked private EntranceProcessor entranceProcessor;
-
- @Before
- public void setUp() throws Exception {
- topology = new SimpleTopology("TestTopology");
- }
-
- @Test
- public void testAddEntrancePi() {
- topology.addEntranceProcessingItem(entrancePi);
-
- Set<EntranceProcessingItem> entrancePIs = topology.getEntranceProcessingItems();
- assertNotNull("Set of entrance PIs is null.",entrancePIs);
- assertEquals("Number of entrance PI in SimpleTopology must be 1",1,entrancePIs.size());
- assertSame("Entrance PI was not set correctly.",entrancePi,entrancePIs.toArray()[0]);
- // TODO: verify that entrance PI is in the set of ProcessingItems
- // Need to access topology's set of PIs (getProcessingItems() method)
- }
-
- @Test
- public void testRun() {
- topology.addEntranceProcessingItem(entrancePi);
-
- new NonStrictExpectations() {
- {
- entrancePi.getProcessor();
- result=entranceProcessor;
-
- }
- };
-
- new Expectations() {
- {
- entranceProcessor.onCreate(anyInt);
- entrancePi.startSendingEvents();
- }
- };
- topology.run();
- }
-
- @Test(expected=IllegalStateException.class)
- public void testRunWithoutEntrancePI() {
- topology.run();
- }
+ @Tested
+ private SimpleTopology topology;
+
+ @Mocked
+ private SimpleEntranceProcessingItem entrancePi;
+ @Mocked
+ private EntranceProcessor entranceProcessor;
+
+ @Before
+ public void setUp() throws Exception {
+ topology = new SimpleTopology("TestTopology");
+ }
+
+ @Test
+ public void testAddEntrancePi() {
+ topology.addEntranceProcessingItem(entrancePi);
+
+ Set<EntranceProcessingItem> entrancePIs = topology.getEntranceProcessingItems();
+ assertNotNull("Set of entrance PIs is null.", entrancePIs);
+ assertEquals("Number of entrance PI in SimpleTopology must be 1", 1, entrancePIs.size());
+ assertSame("Entrance PI was not set correctly.", entrancePi, entrancePIs.toArray()[0]);
+ // TODO: verify that entrance PI is in the set of ProcessingItems
+ // Need to access topology's set of PIs (getProcessingItems() method)
+ }
+
+ @Test
+ public void testRun() {
+ topology.addEntranceProcessingItem(entrancePi);
+
+ new NonStrictExpectations() {
+ {
+ entrancePi.getProcessor();
+ result = entranceProcessor;
+
+ }
+ };
+
+ new Expectations() {
+ {
+ entranceProcessor.onCreate(anyInt);
+ entrancePi.startSendingEvents();
+ }
+ };
+ topology.run();
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testRunWithoutEntrancePI() {
+ topology.run();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4ComponentFactory.java
----------------------------------------------------------------------
diff --git a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4ComponentFactory.java b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4ComponentFactory.java
index 33299ac..b627416 100644
--- a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4ComponentFactory.java
+++ b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4ComponentFactory.java
@@ -40,58 +40,59 @@ import com.yahoo.labs.samoa.topology.Topology;
*/
public class S4ComponentFactory implements ComponentFactory {
- public static final Logger logger = LoggerFactory.getLogger(S4ComponentFactory.class);
- protected S4DoTask app;
+ public static final Logger logger = LoggerFactory.getLogger(S4ComponentFactory.class);
+ protected S4DoTask app;
- @Override
- public ProcessingItem createPi(Processor processor, int paralellism) {
- S4ProcessingItem processingItem = new S4ProcessingItem(app);
- // TODO refactor how to set the paralellism level
- processingItem.setParalellismLevel(paralellism);
- processingItem.setProcessor(processor);
+ @Override
+ public ProcessingItem createPi(Processor processor, int paralellism) {
+ S4ProcessingItem processingItem = new S4ProcessingItem(app);
+ // TODO refactor how to set the paralellism level
+ processingItem.setParalellismLevel(paralellism);
+ processingItem.setProcessor(processor);
- return processingItem;
- }
+ return processingItem;
+ }
- @Override
- public ProcessingItem createPi(Processor processor) {
- return this.createPi(processor, 1);
- }
+ @Override
+ public ProcessingItem createPi(Processor processor) {
+ return this.createPi(processor, 1);
+ }
- @Override
- public EntranceProcessingItem createEntrancePi(EntranceProcessor entranceProcessor) {
- // TODO Create source Entry processing item that connects to an external stream
- S4EntranceProcessingItem entrancePi = new S4EntranceProcessingItem(entranceProcessor, app);
- entrancePi.setParallelism(1); // FIXME should not be set to 1 statically
- return entrancePi;
- }
+ @Override
+ public EntranceProcessingItem createEntrancePi(EntranceProcessor entranceProcessor) {
+ // TODO Create source Entry processing item that connects to an external
+ // stream
+ S4EntranceProcessingItem entrancePi = new S4EntranceProcessingItem(entranceProcessor, app);
+ entrancePi.setParallelism(1); // FIXME should not be set to 1 statically
+ return entrancePi;
+ }
- @Override
- public Stream createStream(IProcessingItem sourcePi) {
- S4Stream aStream = new S4Stream(app);
- return aStream;
- }
+ @Override
+ public Stream createStream(IProcessingItem sourcePi) {
+ S4Stream aStream = new S4Stream(app);
+ return aStream;
+ }
- @Override
- public Topology createTopology(String topoName) {
- return new S4Topology(topoName);
- }
+ @Override
+ public Topology createTopology(String topoName) {
+ return new S4Topology(topoName);
+ }
- /**
- * Initialization method.
- *
- * @param evalTask
- */
- public void init(String evalTask) {
- // Task is initiated in the DoTaskApp
- }
+ /**
+ * Initialization method.
+ *
+ * @param evalTask
+ */
+ public void init(String evalTask) {
+ // Task is initiated in the DoTaskApp
+ }
- /**
- * Sets S4 application.
- *
- * @param app
- */
- public void setApp(S4DoTask app) {
- this.app = app;
- }
+ /**
+ * Sets S4 application.
+ *
+ * @param app
+ */
+ public void setApp(S4DoTask app) {
+ this.app = app;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4DoTask.java
----------------------------------------------------------------------
diff --git a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4DoTask.java b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4DoTask.java
index 0f474a4..3691a82 100644
--- a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4DoTask.java
+++ b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4DoTask.java
@@ -56,208 +56,213 @@ import com.google.inject.name.Named;
*/
final public class S4DoTask extends App {
- private final Logger logger = LoggerFactory.getLogger(S4DoTask.class);
- Task task;
+ private final Logger logger = LoggerFactory.getLogger(S4DoTask.class);
+ Task task;
- @Inject @Named("evalTask") public String evalTask;
+ @Inject
+ @Named("evalTask")
+ public String evalTask;
- public S4DoTask() {
- super();
- }
-
- /** The engine. */
- protected ComponentFactory componentFactory;
-
- /**
- * Gets the factory.
- *
- * @return the factory
- */
- public ComponentFactory getFactory() {
- return componentFactory;
- }
+ public S4DoTask() {
+ super();
+ }
- /**
- * Sets the factory.
- *
- * @param factory
- * the new factory
- */
- public void setFactory(ComponentFactory factory) {
- this.componentFactory = factory;
- }
+ /** The engine. */
+ protected ComponentFactory componentFactory;
- /*
- * Build the application
- *
- * @see org.apache.s4.core.App#onInit()
- */
- /*
- * (non-Javadoc)
- *
- * @see org.apache.s4.core.App#onInit()
- */
- @Override
- protected void onInit() {
- logger.info("DoTaskApp onInit");
- // ConsoleReporters prints S4 metrics
- // MetricsRegistry mr = new MetricsRegistry();
- //
- // CsvReporter.enable(new File(System.getProperty("user.home")
- // + "/monitor/"), 10, TimeUnit.SECONDS);
- // ConsoleReporter.enable(10, TimeUnit.SECONDS);
- try {
- System.err.println();
- System.err.println(Globals.getWorkbenchInfoString());
- System.err.println();
+ /**
+ * Gets the factory.
+ *
+ * @return the factory
+ */
+ public ComponentFactory getFactory() {
+ return componentFactory;
+ }
- } catch (Exception ex) {
- ex.printStackTrace();
- }
- S4ComponentFactory factory = new S4ComponentFactory();
- factory.setApp(this);
+ /**
+ * Sets the factory.
+ *
+ * @param factory
+ * the new factory
+ */
+ public void setFactory(ComponentFactory factory) {
+ this.componentFactory = factory;
+ }
- // logger.debug("LC {}", lc);
+ /*
+ * Build the application
+ *
+ * @see org.apache.s4.core.App#onInit()
+ */
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.s4.core.App#onInit()
+ */
+ @Override
+ protected void onInit() {
+ logger.info("DoTaskApp onInit");
+ // ConsoleReporters prints S4 metrics
+ // MetricsRegistry mr = new MetricsRegistry();
+ //
+ // CsvReporter.enable(new File(System.getProperty("user.home")
+ // + "/monitor/"), 10, TimeUnit.SECONDS);
+ // ConsoleReporter.enable(10, TimeUnit.SECONDS);
+ try {
+ System.err.println();
+ System.err.println(Globals.getWorkbenchInfoString());
+ System.err.println();
- // task = TaskProvider.getTask(evalTask);
+ } catch (Exception ex) {
+ ex.printStackTrace();
+ }
+ S4ComponentFactory factory = new S4ComponentFactory();
+ factory.setApp(this);
- // EXAMPLE OPTIONS
- // -l Clustream -g Clustream -i 100000 -s (RandomRBFGeneratorEvents -K
- // 5 -N 0.0)
- // String[] args = new String[] {evalTask,"-l", "Clustream","-g",
- // "Clustream", "-i", "100000", "-s", "(RamdomRBFGeneratorsEvents",
- // "-K", "5", "-N", "0.0)"};
- // String[] args = new String[] { evalTask, "-l", "clustream.Clustream",
- // "-g", "clustream.Clustream", "-i", "100000", "-s",
- // "(RandomRBFGeneratorEvents", "-K", "5", "-N", "0.0)" };
- logger.debug("PARAMETERS {}", evalTask);
- // params = params.replace(":", " ");
- List<String> parameters = new ArrayList<String>();
- // parameters.add(evalTask);
- try {
- parameters.addAll(Arrays.asList(URLDecoder.decode(evalTask, "UTF-8").split(" ")));
- } catch (UnsupportedEncodingException ex) {
- ex.printStackTrace();
- }
- String[] args = parameters.toArray(new String[0]);
- Option[] extraOptions = new Option[] {};
- // build a single string by concatenating cli options
- StringBuilder cliString = new StringBuilder();
- for (int i = 0; i < args.length; i++) {
- cliString.append(" ").append(args[i]);
- }
+ // logger.debug("LC {}", lc);
- // parse options
- try {
- task = (Task) ClassOption.cliStringToObject(cliString.toString(), Task.class, extraOptions);
- task.setFactory(factory);
- task.init();
- } catch (Exception e) {
- e.printStackTrace();
- }
+ // task = TaskProvider.getTask(evalTask);
+ // EXAMPLE OPTIONS
+ // -l Clustream -g Clustream -i 100000 -s (RandomRBFGeneratorEvents -K
+ // 5 -N 0.0)
+ // String[] args = new String[] {evalTask,"-l", "Clustream","-g",
+ // "Clustream", "-i", "100000", "-s", "(RamdomRBFGeneratorsEvents",
+ // "-K", "5", "-N", "0.0)"};
+ // String[] args = new String[] { evalTask, "-l", "clustream.Clustream",
+ // "-g", "clustream.Clustream", "-i", "100000", "-s",
+ // "(RandomRBFGeneratorEvents", "-K", "5", "-N", "0.0)" };
+ logger.debug("PARAMETERS {}", evalTask);
+ // params = params.replace(":", " ");
+ List<String> parameters = new ArrayList<String>();
+ // parameters.add(evalTask);
+ try {
+ parameters.addAll(Arrays.asList(URLDecoder.decode(evalTask, "UTF-8").split(" ")));
+ } catch (UnsupportedEncodingException ex) {
+ ex.printStackTrace();
+ }
+ String[] args = parameters.toArray(new String[0]);
+ Option[] extraOptions = new Option[] {};
+ // build a single string by concatenating cli options
+ StringBuilder cliString = new StringBuilder();
+ for (int i = 0; i < args.length; i++) {
+ cliString.append(" ").append(args[i]);
}
- /*
- * (non-Javadoc)
- *
- * @see org.apache.s4.core.App#onStart()
- */
- @Override
- protected void onStart() {
- logger.info("Starting DoTaskApp... App Partition [{}]", this.getPartitionId());
- // <<<<<<< HEAD Task doesn't have start in latest storm-impl
- // TODO change the way the app starts
- // if (this.getPartitionId() == 0)
- S4Topology s4topology = (S4Topology) getTask().getTopology();
- S4EntranceProcessingItem epi = (S4EntranceProcessingItem) s4topology.getEntranceProcessingItem();
- while (epi.injectNextEvent())
- // inject events from the EntrancePI
- ;
+ // parse options
+ try {
+ task = (Task) ClassOption.cliStringToObject(cliString.toString(), Task.class, extraOptions);
+ task.setFactory(factory);
+ task.init();
+ } catch (Exception e) {
+ e.printStackTrace();
}
- /*
- * (non-Javadoc)
- *
- * @see org.apache.s4.core.App#onClose()
- */
- @Override
- protected void onClose() {
- System.out.println("Closing DoTaskApp...");
+ }
- }
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.s4.core.App#onStart()
+ */
+ @Override
+ protected void onStart() {
+ logger.info("Starting DoTaskApp... App Partition [{}]", this.getPartitionId());
+ // <<<<<<< HEAD Task doesn't have start in latest storm-impl
+ // TODO change the way the app starts
+ // if (this.getPartitionId() == 0)
+ S4Topology s4topology = (S4Topology) getTask().getTopology();
+ S4EntranceProcessingItem epi = (S4EntranceProcessingItem) s4topology.getEntranceProcessingItem();
+ while (epi.injectNextEvent())
+ // inject events from the EntrancePI
+ ;
+ }
- /**
- * Gets the task.
- *
- * @return the task
- */
- public Task getTask() {
- return task;
- }
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.s4.core.App#onClose()
+ */
+ @Override
+ protected void onClose() {
+ System.out.println("Closing DoTaskApp...");
- // These methods are protected in App and can not be accessed from outside.
- // They are
- // called from parallel classifiers and evaluations. Is there a better way
- // to do that?
+ }
- /*
- * (non-Javadoc)
- *
- * @see org.apache.s4.core.App#createPE(java.lang.Class)
- */
- @Override
- public <T extends ProcessingElement> T createPE(Class<T> type) {
- return super.createPE(type);
- }
+ /**
+ * Gets the task.
+ *
+ * @return the task
+ */
+ public Task getTask() {
+ return task;
+ }
- /*
- * (non-Javadoc)
- *
- * @see org.apache.s4.core.App#createStream(java.lang.String, org.apache.s4.base.KeyFinder, org.apache.s4.core.ProcessingElement[])
- */
- @Override
- public <T extends Event> Stream<T> createStream(String name, KeyFinder<T> finder, ProcessingElement... processingElements) {
- return super.createStream(name, finder, processingElements);
- }
+ // These methods are protected in App and can not be accessed from outside.
+ // They are
+ // called from parallel classifiers and evaluations. Is there a better way
+ // to do that?
- /*
- * (non-Javadoc)
- *
- * @see org.apache.s4.core.App#createStream(java.lang.String, org.apache.s4.core.ProcessingElement[])
- */
- @Override
- public <T extends Event> Stream<T> createStream(String name, ProcessingElement... processingElements) {
- return super.createStream(name, processingElements);
- }
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.s4.core.App#createPE(java.lang.Class)
+ */
+ @Override
+ public <T extends ProcessingElement> T createPE(Class<T> type) {
+ return super.createPE(type);
+ }
- // @com.beust.jcommander.Parameters(separators = "=")
- // class Parameters {
- //
- // @Parameter(names={"-lc","-local"}, description="Local clustering method")
- // private String localClustering;
- //
- // @Parameter(names={"-gc","-global"},
- // description="Global clustering method")
- // private String globalClustering;
- //
- // }
- //
- // class ParametersConverter {// implements IStringConverter<String[]> {
- //
- //
- // public String[] convertToArgs(String value) {
- //
- // String[] params = value.split(",");
- // String[] args = new String[params.length*2];
- // for(int i=0; i<params.length ; i++) {
- // args[i] = params[i].split("=")[0];
- // args[i+1] = params[i].split("=")[1];
- // i++;
- // }
- // return args;
- // }
- //
- // }
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.s4.core.App#createStream(java.lang.String,
+ * org.apache.s4.base.KeyFinder, org.apache.s4.core.ProcessingElement[])
+ */
+ @Override
+ public <T extends Event> Stream<T> createStream(String name, KeyFinder<T> finder,
+ ProcessingElement... processingElements) {
+ return super.createStream(name, finder, processingElements);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.s4.core.App#createStream(java.lang.String,
+ * org.apache.s4.core.ProcessingElement[])
+ */
+ @Override
+ public <T extends Event> Stream<T> createStream(String name, ProcessingElement... processingElements) {
+ return super.createStream(name, processingElements);
+ }
+
+ // @com.beust.jcommander.Parameters(separators = "=")
+ // class Parameters {
+ //
+ // @Parameter(names={"-lc","-local"}, description="Local clustering method")
+ // private String localClustering;
+ //
+ // @Parameter(names={"-gc","-global"},
+ // description="Global clustering method")
+ // private String globalClustering;
+ //
+ // }
+ //
+ // class ParametersConverter {// implements IStringConverter<String[]> {
+ //
+ //
+ // public String[] convertToArgs(String value) {
+ //
+ // String[] params = value.split(",");
+ // String[] args = new String[params.length*2];
+ // for(int i=0; i<params.length ; i++) {
+ // args[i] = params[i].split("=")[0];
+ // args[i+1] = params[i].split("=")[1];
+ // i++;
+ // }
+ // return args;
+ // }
+ //
+ // }
}
http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4EntranceProcessingItem.java
----------------------------------------------------------------------
diff --git a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4EntranceProcessingItem.java b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4EntranceProcessingItem.java
index 2b0c595..6f374fa 100644
--- a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4EntranceProcessingItem.java
+++ b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4EntranceProcessingItem.java
@@ -32,89 +32,89 @@ import com.yahoo.labs.samoa.topology.Stream;
public class S4EntranceProcessingItem extends ProcessingElement implements EntranceProcessingItem {
- private EntranceProcessor entranceProcessor;
- // private S4DoTask app;
- private int parallelism;
- protected Stream outputStream;
-
- /**
- * Constructor of an S4 entrance processing item.
- *
- * @param app
- * : S4 application
- */
- public S4EntranceProcessingItem(EntranceProcessor entranceProcessor, App app) {
- super(app);
- this.entranceProcessor = entranceProcessor;
- // this.app = (S4DoTask) app;
- // this.setSingleton(true);
- }
-
- public void setParallelism(int parallelism) {
- this.parallelism = parallelism;
- }
-
- public int getParallelism() {
- return this.parallelism;
- }
-
- @Override
- public EntranceProcessor getProcessor() {
- return this.entranceProcessor;
- }
-
- //
- // @Override
- // public void put(Instance inst) {
- // // do nothing
- // // may not needed
- // }
-
- @Override
- protected void onCreate() {
- // was commented
- if (this.entranceProcessor != null) {
- // TODO revisit if we need to change it to a clone() call
- this.entranceProcessor = (EntranceProcessor) this.entranceProcessor.newProcessor(this.entranceProcessor);
- this.entranceProcessor.onCreate(Integer.parseInt(getId()));
- }
- }
-
- @Override
- protected void onRemove() {
- // do nothing
- }
-
- //
- // /**
- // * Sets the entrance processing item processor.
- // *
- // * @param processor
- // */
- // public void setProcessor(Processor processor) {
- // this.entranceProcessor = processor;
- // }
-
- @Override
- public void setName(String name) {
- super.setName(name);
- }
-
- @Override
- public EntranceProcessingItem setOutputStream(Stream stream) {
- if (this.outputStream != null)
- throw new IllegalStateException("Output stream for an EntrancePI sohuld be initialized only once");
- this.outputStream = stream;
- return this;
- }
-
- public boolean injectNextEvent() {
- if (entranceProcessor.hasNext()) {
- ContentEvent nextEvent = this.entranceProcessor.nextEvent();
- outputStream.put(nextEvent);
- return entranceProcessor.hasNext();
- } else
- return false;
- // return !nextEvent.isLastEvent();
+ private EntranceProcessor entranceProcessor;
+ // private S4DoTask app;
+ private int parallelism;
+ protected Stream outputStream;
+
+ /**
+ * Constructor of an S4 entrance processing item.
+ *
+ * @param app
+ * : S4 application
+ */
+ public S4EntranceProcessingItem(EntranceProcessor entranceProcessor, App app) {
+ super(app);
+ this.entranceProcessor = entranceProcessor;
+ // this.app = (S4DoTask) app;
+ // this.setSingleton(true);
+ }
+
+ public void setParallelism(int parallelism) {
+ this.parallelism = parallelism;
+ }
+
+ public int getParallelism() {
+ return this.parallelism;
+ }
+
+ @Override
+ public EntranceProcessor getProcessor() {
+ return this.entranceProcessor;
+ }
+
+ //
+ // @Override
+ // public void put(Instance inst) {
+ // // do nothing
+ // // may not needed
+ // }
+
+ @Override
+ protected void onCreate() {
+ // was commented
+ if (this.entranceProcessor != null) {
+ // TODO revisit if we need to change it to a clone() call
+ this.entranceProcessor = (EntranceProcessor) this.entranceProcessor.newProcessor(this.entranceProcessor);
+ this.entranceProcessor.onCreate(Integer.parseInt(getId()));
}
+ }
+
+ @Override
+ protected void onRemove() {
+ // do nothing
+ }
+
+ //
+ // /**
+ // * Sets the entrance processing item processor.
+ // *
+ // * @param processor
+ // */
+ // public void setProcessor(Processor processor) {
+ // this.entranceProcessor = processor;
+ // }
+
+ @Override
+ public void setName(String name) {
+ super.setName(name);
+ }
+
+ @Override
+ public EntranceProcessingItem setOutputStream(Stream stream) {
+ if (this.outputStream != null)
+ throw new IllegalStateException("Output stream for an EntrancePI sohuld be initialized only once");
+ this.outputStream = stream;
+ return this;
+ }
+
+ public boolean injectNextEvent() {
+ if (entranceProcessor.hasNext()) {
+ ContentEvent nextEvent = this.entranceProcessor.nextEvent();
+ outputStream.put(nextEvent);
+ return entranceProcessor.hasNext();
+ } else
+ return false;
+ // return !nextEvent.isLastEvent();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Event.java
----------------------------------------------------------------------
diff --git a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Event.java b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Event.java
index 8f8ad9f..62c623c 100644
--- a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Event.java
+++ b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Event.java
@@ -36,55 +36,57 @@ import com.yahoo.labs.samoa.core.ContentEvent;
@Immutable
final public class S4Event extends Event {
- private String key;
-
- public String getKey() {
- return key;
- }
-
- public void setKey(String key) {
- this.key = key;
- }
-
- /** The content event. */
- private ContentEvent contentEvent;
-
- /**
- * Instantiates a new instance event.
- */
- public S4Event() {
- // Needed for serialization of kryo
- }
-
- /**
- * Instantiates a new instance event.
- *
- * @param contentEvent the content event
- */
- public S4Event(ContentEvent contentEvent) {
- if (contentEvent != null) {
- this.contentEvent = contentEvent;
- this.key = contentEvent.getKey();
-
- }
- }
-
- /**
- * Gets the content event.
- *
- * @return the content event
- */
- public ContentEvent getContentEvent() {
- return contentEvent;
- }
-
- /**
- * Sets the content event.
- *
- * @param contentEvent the new content event
- */
- public void setContentEvent(ContentEvent contentEvent) {
- this.contentEvent = contentEvent;
- }
+ private String key;
+
+ public String getKey() {
+ return key;
+ }
+
+ public void setKey(String key) {
+ this.key = key;
+ }
+
+ /** The content event. */
+ private ContentEvent contentEvent;
+
+ /**
+ * Instantiates a new instance event.
+ */
+ public S4Event() {
+ // Needed for serialization of kryo
+ }
+
+ /**
+ * Instantiates a new instance event.
+ *
+ * @param contentEvent
+ * the content event
+ */
+ public S4Event(ContentEvent contentEvent) {
+ if (contentEvent != null) {
+ this.contentEvent = contentEvent;
+ this.key = contentEvent.getKey();
+
+ }
+ }
+
+ /**
+ * Gets the content event.
+ *
+ * @return the content event
+ */
+ public ContentEvent getContentEvent() {
+ return contentEvent;
+ }
+
+ /**
+ * Sets the content event.
+ *
+ * @param contentEvent
+ * the new content event
+ */
+ public void setContentEvent(ContentEvent contentEvent) {
+ this.contentEvent = contentEvent;
+ }
}