You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by vr...@apache.org on 2016/09/27 16:41:19 UTC
[5/6] apex-malhar git commit: Fix trailing whitespace.
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQOutputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQOutputOperator.java
index a19417c..2bbb903 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQOutputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQOutputOperator.java
@@ -74,7 +74,7 @@ public class AbstractRabbitMQOutputOperator extends BaseOperator
transient Channel channel = null;
transient String exchange = "testEx";
transient String queueName="testQ";
-
+
private WindowDataManager windowDataManager;
private transient long currentWindowId;
private transient long largestRecoveryWindowId;
@@ -86,7 +86,7 @@ public class AbstractRabbitMQOutputOperator extends BaseOperator
@Override
public void setup(OperatorContext context)
{
- // Needed to setup idempotency storage manager in setter
+ // Needed to setup idempotency storage manager in setter
this.context = context;
this.operatorContextId = context.getId();
@@ -104,11 +104,11 @@ public class AbstractRabbitMQOutputOperator extends BaseOperator
DTThrowable.rethrow(ex);
}
}
-
+
@Override
public void beginWindow(long windowId)
{
- currentWindowId = windowId;
+ currentWindowId = windowId;
largestRecoveryWindowId = windowDataManager.getLargestCompletedWindow();
if (windowId <= largestRecoveryWindowId) {
// Do not resend already sent tuples
@@ -119,7 +119,7 @@ public class AbstractRabbitMQOutputOperator extends BaseOperator
skipProcessingTuple = false;
}
}
-
+
/**
* {@inheritDoc}
*/
@@ -158,11 +158,11 @@ public class AbstractRabbitMQOutputOperator extends BaseOperator
logger.debug(ex.toString());
}
}
-
+
public WindowDataManager getWindowDataManager() {
return windowDataManager;
}
-
+
public void setWindowDataManager(WindowDataManager windowDataManager) {
this.windowDataManager = windowDataManager;
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/RabbitMQOutputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/RabbitMQOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/RabbitMQOutputOperator.java
index 74ae181..1ddd9d4 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/RabbitMQOutputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/RabbitMQOutputOperator.java
@@ -38,7 +38,7 @@ import com.datatorrent.netlet.util.DTThrowable;
public class RabbitMQOutputOperator extends AbstractSinglePortRabbitMQOutputOperator<byte[]>
{
private static final Logger logger = LoggerFactory.getLogger(RabbitMQOutputOperator.class);
-
+
@Override
public void processTuple(byte[] tuple)
{
@@ -46,6 +46,6 @@ public class RabbitMQOutputOperator extends AbstractSinglePortRabbitMQOutputOper
channel.basicPublish(exchange, "", null, tuple);
} catch (IOException e) {
DTThrowable.rethrow(e);
- }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java
index 59b320d..0b12574 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java
@@ -38,7 +38,7 @@ import com.datatorrent.lib.db.AbstractKeyValueStoreInputOperator;
/**
* This is the base implementation of a Redis input operator.
- *
+ *
* @displayName Abstract Redis Input
* @category Input
* @tags redis, key value
@@ -161,7 +161,7 @@ public abstract class AbstractRedisInputOperator<T> extends AbstractKeyValueStor
scanComplete = false;
scanParameters = new ScanParams();
scanParameters.count(scanCount);
-
+
// For the 1st window after checkpoint, windowID - 1 would not have recovery
// offset stored in windowDataManager
// But recoveryOffset is non-transient, so will be recovered with
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/com/datatorrent/contrib/redis/RedisKeyValueInputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/redis/RedisKeyValueInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/redis/RedisKeyValueInputOperator.java
index de9ee45..ae8ef5c 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/redis/RedisKeyValueInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/redis/RedisKeyValueInputOperator.java
@@ -28,7 +28,7 @@ import com.datatorrent.lib.util.KeyValPair;
* This is the an implementation of a Redis input operator for fetching
* Key-Value pair stored in Redis. It takes in keys to fetch and emits
* corresponding <Key, Value> Pair. Value data type is String in this case.
- *
+ *
* @displayName Redis Input Operator for Key Value pair
* @category Store
* @tags input operator, key value
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/com/datatorrent/contrib/redis/RedisMapAsValueInputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/redis/RedisMapAsValueInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/redis/RedisMapAsValueInputOperator.java
index a9913f9..156252b 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/redis/RedisMapAsValueInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/redis/RedisMapAsValueInputOperator.java
@@ -25,8 +25,8 @@ import com.datatorrent.lib.util.KeyValPair;
/**
* This is the an implementation of a Redis input operator It takes in keys to
* fetch and emits Values stored as Maps in Redis i.e. when value datatype in
- * Redis is HashMap
- *
+ * Redis is HashMap
+ *
* @displayName Redis Input Operator for Map
* @category Store
* @tags input operator, key value
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/com/datatorrent/contrib/redis/RedisStore.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/redis/RedisStore.java b/contrib/src/main/java/com/datatorrent/contrib/redis/RedisStore.java
index 27b4fd8..b540779 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/redis/RedisStore.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/redis/RedisStore.java
@@ -192,7 +192,7 @@ public class RedisStore implements TransactionableKeyValueStore
}
/**
- * Gets the stored Map for given the key, when the value data type is a map, stored with hmset
+ * Gets the stored Map for given the key, when the value data type is a map, stored with hmset
*
* @param key
* @return hashmap stored for the key.
@@ -329,8 +329,8 @@ public class RedisStore implements TransactionableKeyValueStore
}
}
}
-
-
+
+
/**
* @return the timeOut
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/com/datatorrent/contrib/solr/AbstractSolrOutputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/solr/AbstractSolrOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/solr/AbstractSolrOutputOperator.java
index 705e09c..805238c 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/solr/AbstractSolrOutputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/solr/AbstractSolrOutputOperator.java
@@ -84,7 +84,7 @@ public abstract class AbstractSolrOutputOperator<T, S extends Connectable> exten
/**
* Converts the object into Solr document format
- *
+ *
* @param object to be stored to Solr Server
* @return
*/
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/com/datatorrent/contrib/solr/ConcurrentUpdateSolrServerConnector.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/solr/ConcurrentUpdateSolrServerConnector.java b/contrib/src/main/java/com/datatorrent/contrib/solr/ConcurrentUpdateSolrServerConnector.java
index 3e86949..d9237c2 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/solr/ConcurrentUpdateSolrServerConnector.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/solr/ConcurrentUpdateSolrServerConnector.java
@@ -110,7 +110,7 @@ public class ConcurrentUpdateSolrServerConnector extends SolrServerConnector
}
/*
- * HttpClient instance
+ * HttpClient instance
* Gets the HTTP Client instance
*/
public HttpClient getHttpClient()
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/com/datatorrent/contrib/splunk/SplunkStore.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/splunk/SplunkStore.java b/contrib/src/main/java/com/datatorrent/contrib/splunk/SplunkStore.java
index ede1f3a..3abdb1b 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/splunk/SplunkStore.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/splunk/SplunkStore.java
@@ -18,7 +18,7 @@
*/
package com.datatorrent.contrib.splunk;
-import com.splunk.*;
+import com.splunk.*;
import javax.validation.constraints.NotNull;
import org.slf4j.Logger;
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/com/datatorrent/contrib/zmq/ZeroMQInputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/zmq/ZeroMQInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/zmq/ZeroMQInputOperator.java
index 15aaa0b..05d5e52 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/zmq/ZeroMQInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/zmq/ZeroMQInputOperator.java
@@ -31,7 +31,7 @@ package com.datatorrent.contrib.zmq;
public class ZeroMQInputOperator extends AbstractSinglePortZeroMQInputOperator<byte[]>
{
@Override
- public byte[] getTuple(byte[] message) {
+ public byte[] getTuple(byte[] message) {
return message;
}
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/Change.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/Change.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/Change.java
index 146a65d..51d90ab 100644
--- a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/Change.java
+++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/Change.java
@@ -25,7 +25,7 @@ import com.datatorrent.lib.util.BaseNumberValueOperator;
/**
* Operator compares data values arriving on input port with base value input operator.
- *
+ *
* <p>
* Arriving base value is stored in operator for comparison, old base value is overwritten.
* This emits <change in value,percentage change>.
@@ -80,7 +80,7 @@ public class Change<V extends Number> extends BaseNumberValueOperator<V>
}
}
};
-
+
/**
* Input port that takes a number It stores the value for base comparison.
*/
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/CompareExceptMap.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/CompareExceptMap.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/CompareExceptMap.java
index 155cb23..bfa3c0a 100644
--- a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/CompareExceptMap.java
+++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/CompareExceptMap.java
@@ -31,7 +31,7 @@ import com.datatorrent.lib.util.UnifierHashMap;
* Operator compares based on the property "key", "value", and "compare".
* <p>
* The comparison is done by getting double value from the Number.
- * Passed tuples are emitted on the output port "compare".
+ * Passed tuples are emitted on the output port "compare".
* Failed tuples are emitted on port "except".
* Both output ports are optional, but at least one has to be connected.
* This module is a pass through<br>
@@ -91,7 +91,7 @@ public class CompareExceptMap<K, V extends Number> extends MatchMap<K, V>
*/
@OutputPortFieldAnnotation(optional = true)
public final transient DefaultOutputPort<HashMap<K, V>> compare = match;
-
+
/**
* Output port that emits a hashmap of non matching tuples after comparison.
*/
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/ExceptMap.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/ExceptMap.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/ExceptMap.java
index 3dcae74..2dcb583 100644
--- a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/ExceptMap.java
+++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/ExceptMap.java
@@ -66,7 +66,7 @@ import com.datatorrent.lib.util.UnifierHashMap;
@Deprecated
@Stateless
public class ExceptMap<K, V extends Number> extends MatchMap<K, V>
-{
+{
/**
* Output port that emits non matching number tuples.
*/
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/Quotient.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/Quotient.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/Quotient.java
index e1deb9d..8909acd 100644
--- a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/Quotient.java
+++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/Quotient.java
@@ -24,7 +24,7 @@ import com.datatorrent.api.annotation.OperatorAnnotation;
import com.datatorrent.lib.util.BaseNumberValueOperator;
/**
- * This operator adds all the values on "numerator" and "denominator" and emits quotient at end of window.
+ * This operator adds all the values on "numerator" and "denominator" and emits quotient at end of window.
* <p>
* <br>
* <b>StateFull : Yes </b>, Sum of values is taken over application window. <br>
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/QuotientMap.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/QuotientMap.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/QuotientMap.java
index 3581b81..b37bbd5 100644
--- a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/QuotientMap.java
+++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/QuotientMap.java
@@ -31,7 +31,7 @@ import com.datatorrent.api.annotation.OperatorAnnotation;
import com.datatorrent.lib.util.BaseNumberKeyValueOperator;
/**
- * Add all the values for each key on "numerator" and "denominator" and emits quotient at end of window for all keys in the denominator.
+ * Add all the values for each key on "numerator" and "denominator" and emits quotient at end of window for all keys in the denominator.
* <p>
* <br>
* Application can set multiplication value for quotient(default = 1). <br>
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/SumCountMap.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/SumCountMap.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/SumCountMap.java
index 048eff7..b2493a1 100644
--- a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/SumCountMap.java
+++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/SumCountMap.java
@@ -178,7 +178,7 @@ public class SumCountMap<K, V extends Number> extends
return ret;
}
};
-
+
/**
* Key,short sum output port.
*/
@@ -194,7 +194,7 @@ public class SumCountMap<K, V extends Number> extends
return ret;
}
};
-
+
/**
* Key,float sum output port.
*/
@@ -210,7 +210,7 @@ public class SumCountMap<K, V extends Number> extends
return ret;
}
};
-
+
/**
* Key,integer sum output port.
*/
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/org/apache/apex/malhar/contrib/parser/StreamingJsonParser.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/parser/StreamingJsonParser.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/parser/StreamingJsonParser.java
index 38a4804..1f8dc5c 100644
--- a/contrib/src/main/java/org/apache/apex/malhar/contrib/parser/StreamingJsonParser.java
+++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/parser/StreamingJsonParser.java
@@ -61,7 +61,7 @@ import com.datatorrent.lib.util.PojoUtils;
* <b>err</b>:tuples that could not be parsed are emitted on this port as
* KeyValPair<String,String><br>
* Key being the tuple and Val being the reason
- *
+ *
* @displayName SimpleStreamingJsonParser
* @category Parsers
* @tags json pojo parser streaming
@@ -187,7 +187,7 @@ public class StreamingJsonParser extends Parser<byte[], KeyValPair<String, Strin
/**
* Creates a map representing fieldName in POJO:field in JSON:Data type
- *
+ *
* @return List of FieldInfo
*/
private List<FieldInfo> createFieldInfoMap(String str)
@@ -255,7 +255,7 @@ public class StreamingJsonParser extends Parser<byte[], KeyValPair<String, Strin
/**
* Use reflection to generate field info values if the user has not provided
* the inputs mapping
- *
+ *
* @return String representing the POJO field to JSON field mapping
*/
private String generateFieldInfoInputs(Class<?> cls)
@@ -331,7 +331,7 @@ public class StreamingJsonParser extends Parser<byte[], KeyValPair<String, Strin
/**
* Returns a POJO from a Generic Record Null is set as the default value if a
* key is not found in the parsed JSON
- *
+ *
* @return Object
*/
@SuppressWarnings("unchecked")
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/test/java/com/datatorrent/contrib/couchbase/CouchBaseSetTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/couchbase/CouchBaseSetTest.java b/contrib/src/test/java/com/datatorrent/contrib/couchbase/CouchBaseSetTest.java
index 9c99ad2..f57279d 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/couchbase/CouchBaseSetTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/couchbase/CouchBaseSetTest.java
@@ -90,7 +90,7 @@ public class CouchBaseSetTest
System.err.println("Error connecting to Couchbase: " + e.getMessage());
System.exit(1);
}
-
+
TestPojo obj = new TestPojo();
obj.setName("test");
obj.setPhone(123344555);
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/test/java/com/datatorrent/contrib/elasticsearch/ElasticSearchOperatorTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/elasticsearch/ElasticSearchOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/elasticsearch/ElasticSearchOperatorTest.java
index 161fe90..671d7dc 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/elasticsearch/ElasticSearchOperatorTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/elasticsearch/ElasticSearchOperatorTest.java
@@ -89,7 +89,7 @@ public class ElasticSearchOperatorTest
ElasticSearchMapOutputOperator<Map<String, Object>> operator = new ElasticSearchMapOutputOperator<Map<String, Object>>() {
/*
* (non-Javadoc)
- *
+ *
* @see com.datatorrent.contrib.elasticsearch. AbstractElasticSearchOutputOperator #processTuple(java.lang.Object)
*/
@Override
@@ -128,7 +128,7 @@ public class ElasticSearchOperatorTest
/**
* Read data written to elastic search
- *
+ *
* @param tupleIDs
* @param testStartTime
*/
@@ -137,7 +137,7 @@ public class ElasticSearchOperatorTest
ElasticSearchMapInputOperator<Map<String, Object>> operator = new ElasticSearchMapInputOperator<Map<String, Object>>() {
/**
* Set SearchRequestBuilder parameters specific to current window.
- *
+ *
* @see com.datatorrent.contrib.elasticsearch.ElasticSearchMapInputOperator#getSearchRequestBuilder()
*/
@Override
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/test/java/com/datatorrent/contrib/elasticsearch/ElasticSearchPercolateTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/elasticsearch/ElasticSearchPercolateTest.java b/contrib/src/test/java/com/datatorrent/contrib/elasticsearch/ElasticSearchPercolateTest.java
index f707f1b..daf1602 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/elasticsearch/ElasticSearchPercolateTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/elasticsearch/ElasticSearchPercolateTest.java
@@ -77,9 +77,9 @@ public class ElasticSearchPercolateTest
/**
* Register percolate queries on ElasticSearch
- *
+ *
* @throws IOException
- *
+ *
*/
private void registerPercolateQueries() throws IOException
{
@@ -89,7 +89,7 @@ public class ElasticSearchPercolateTest
}
/**
- *
+ *
*/
private void checkPercolateResponse()
{
@@ -136,7 +136,7 @@ public class ElasticSearchPercolateTest
matchIds.add(match.getId().toString());
}
Collections.sort(matchIds);
-
+
Assert.assertArrayEquals(matchIds.toArray(), matches[i]);
i++;
}
@@ -157,6 +157,6 @@ public class ElasticSearchPercolateTest
//This indicates that elasticsearch is not running on a particular machine.
//Silently ignore in this case.
}
-
+
}
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/test/java/com/datatorrent/contrib/geode/GeodeCheckpointStoreTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/geode/GeodeCheckpointStoreTest.java b/contrib/src/test/java/com/datatorrent/contrib/geode/GeodeCheckpointStoreTest.java
index 5c59622..5bde40c 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/geode/GeodeCheckpointStoreTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/geode/GeodeCheckpointStoreTest.java
@@ -59,7 +59,7 @@ public class GeodeCheckpointStoreTest
store.setTableName(REGION_NAME);
store.connect();
}
-
+
@Test
public void testSave() throws IOException
{
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/test/java/com/datatorrent/contrib/hbase/HBasePOJOInputOperatorTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/hbase/HBasePOJOInputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/hbase/HBasePOJOInputOperatorTest.java
index 4e6bb39..6a2f891 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/hbase/HBasePOJOInputOperatorTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/hbase/HBasePOJOInputOperatorTest.java
@@ -54,7 +54,7 @@ public class HBasePOJOInputOperatorTest
HBASEINPUT,
OUTPUT
};
-
+
public static class MyGenerator extends TupleGenerateCacheOperator<TestPOJO>
{
public MyGenerator()
@@ -84,7 +84,7 @@ public class HBasePOJOInputOperatorTest
private HBaseStore store;
private HBasePOJOPutOperator hbaseOutputOperator;
private TestHBasePOJOInputOperator hbaseInputOperator;
-
+
@Before
public void prepare() throws Exception
{
@@ -93,13 +93,13 @@ public class HBasePOJOInputOperatorTest
setupOperators();
HBaseUtil.createTable( store.getConfiguration(), store.getTableName());
}
-
+
@After
public void cleanup() throws Exception
{
HBaseUtil.deleteTable( store.getConfiguration(), store.getTableName());
}
-
+
@Test
public void test() throws Exception
@@ -119,20 +119,20 @@ public class HBasePOJOInputOperatorTest
// Create ActiveMQStringSinglePortOutputOperator
MyGenerator generator = dag.addOperator( OPERATOR.GENERATOR.name(), MyGenerator.class);
generator.setTupleNum( TUPLE_NUM );
-
+
hbaseOutputOperator = dag.addOperator( OPERATOR.HBASEOUTPUT.name(), hbaseOutputOperator );
hbaseInputOperator = dag.addOperator(OPERATOR.HBASEINPUT.name(), hbaseInputOperator);
dag.setOutputPortAttribute(hbaseInputOperator.outputPort, Context.PortContext.TUPLE_CLASS, TestPOJO.class);
-
-
+
+
TupleCacheOutputOperator output = dag.addOperator(OPERATOR.OUTPUT.name(), TupleCacheOutputOperator.class);
-
+
// Connect ports
dag.addStream("queue1", generator.outputPort, hbaseOutputOperator.input ).setLocality(DAG.Locality.NODE_LOCAL);
dag.addStream("queue2", hbaseInputOperator.outputPort, output.inputPort ).setLocality(DAG.Locality.NODE_LOCAL);
-
-
+
+
Configuration conf = new Configuration(false);
lma.prepareDAG(app, conf);
@@ -158,10 +158,10 @@ public class HBasePOJOInputOperatorTest
throw new RuntimeException("Testcase taking too long");
}
}
-
+
lc.shutdown();
-
+
validate( generator.getTuples(), output.getReceivedTuples() );
}
@@ -173,11 +173,11 @@ public class HBasePOJOInputOperatorTest
actual.removeAll(expected);
Assert.assertTrue( "content not same.", actual.isEmpty() );
}
-
+
protected void setupOperators()
{
TableInfo<HBaseFieldInfo> tableInfo = new TableInfo<HBaseFieldInfo>();
-
+
tableInfo.setRowOrIdExpression("row");
List<HBaseFieldInfo> fieldsInfo = new ArrayList<HBaseFieldInfo>();
@@ -186,10 +186,10 @@ public class HBasePOJOInputOperatorTest
fieldsInfo.add( new HBaseFieldInfo( "address", "address", SupportType.STRING, "f1") );
tableInfo.setFieldsInfo(fieldsInfo);
-
+
hbaseInputOperator.setTableInfo(tableInfo);
hbaseOutputOperator.setTableInfo(tableInfo);
-
+
store = new HBaseStore();
store.setTableName("test");
store.setZookeeperQuorum("localhost");
@@ -197,7 +197,7 @@ public class HBasePOJOInputOperatorTest
hbaseInputOperator.setStore(store);
hbaseOutputOperator.setStore(store);
-
+
OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(
OPERATOR_ID, new AttributeMap.DefaultAttributeMap());
hbaseInputOperator.setup(context);
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/test/java/com/datatorrent/contrib/hbase/HBasePOJOPutOperatorTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/hbase/HBasePOJOPutOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/hbase/HBasePOJOPutOperatorTest.java
index 51dbadc..8c81560 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/hbase/HBasePOJOPutOperatorTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/hbase/HBasePOJOPutOperatorTest.java
@@ -52,11 +52,11 @@ public class HBasePOJOPutOperatorTest
private static final Logger logger = LoggerFactory.getLogger(HBasePOJOPutOperatorTest.class);
public static final int TEST_SIZE = 15000;
public static final int WINDOW_SIZE = 1500;
-
+
private HBasePOJOPutOperator operator;
-
+
private final long startWindowId = Calendar.getInstance().getTimeInMillis();
-
+
public HBasePOJOPutOperatorTest()
{
}
@@ -69,13 +69,13 @@ public class HBasePOJOPutOperatorTest
createOrDeleteTable(operator.getStore(), false );
}
-
+
@After
public void cleanup() throws Exception
{
createOrDeleteTable(operator.getStore(), true );
}
-
+
/**
* this test case only test if HBasePojoPutOperator can save data to the
* HBase. it doesn't test connection to the other operators
@@ -107,7 +107,7 @@ public class HBasePOJOPutOperatorTest
Thread.sleep(30000);
-
+
}
catch (Exception e)
{
@@ -115,7 +115,7 @@ public class HBasePOJOPutOperatorTest
Assert.fail(e.getMessage());
}
}
-
+
protected void createOrDeleteTable(HBaseStore store, boolean isDelete ) throws Exception
{
HBaseAdmin admin = null;
@@ -123,7 +123,7 @@ public class HBasePOJOPutOperatorTest
{
admin = new HBaseAdmin(store.getConfiguration());
final String tableName = store.getTableName();
-
+
if (!admin.isTableAvailable(tableName) && !isDelete )
{
HTableDescriptor tableDescriptor = new HTableDescriptor(tableName);
@@ -170,14 +170,14 @@ public class HBasePOJOPutOperatorTest
OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(
OPERATOR_ID, attributeMap);
-
+
operator.setup(context);
}
protected void configure(HBasePOJOPutOperator operator)
{
TableInfo<HBaseFieldInfo> tableInfo = new TableInfo<HBaseFieldInfo>();
-
+
tableInfo.setRowOrIdExpression("row");
List<HBaseFieldInfo> fieldsInfo = new ArrayList<HBaseFieldInfo>();
@@ -203,7 +203,7 @@ public class HBasePOJOPutOperatorTest
{
if( tupleGenerator == null )
tupleGenerator = new TupleGenerator<TestPOJO>( TestPOJO.class );
-
+
return tupleGenerator.getNextTuple();
}
@@ -225,21 +225,21 @@ public class HBasePOJOPutOperatorTest
HTable table = operator.getStore().getTable();
Scan scan = new Scan();
ResultScanner resultScanner = table.getScanner(scan);
-
+
int recordCount = 0;
while( true )
{
Result result = resultScanner.next();
if( result == null )
break;
-
+
int rowId = Integer.valueOf( Bytes.toString( result.getRow() ) );
Assert.assertTrue( "rowId="+rowId+" aut of range" , ( rowId > 0 && rowId <= TEST_SIZE ) );
Assert.assertTrue( "the rowId="+rowId+" already processed.", rowIds[rowId-1] == 1 );
rowIds[rowId-1]=0;
-
+
List<Cell> cells = result.listCells();
-
+
Map<String, byte[]> map = new HashMap<String,byte[]>();
for( Cell cell : cells )
{
@@ -250,17 +250,17 @@ public class HBasePOJOPutOperatorTest
TestPOJO read = TestPOJO.from(map);
read.setRowId((long)rowId);
TestPOJO expected = new TestPOJO( rowId );
-
+
Assert.assertTrue( String.format( "expected %s, get %s ", expected.toString(), read.toString() ), expected.completeEquals(read) );
recordCount++;
}
-
+
int missedCount = 0;
if( recordCount != TEST_SIZE )
{
logger.error( "unsaved records: " );
StringBuilder sb = new StringBuilder();
-
+
for( int i=0; i<TEST_SIZE; ++i )
{
if( rowIds[i] != 0 )
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/test/java/com/datatorrent/contrib/hbase/HBaseTransactionalPutOperatorTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/hbase/HBaseTransactionalPutOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/hbase/HBaseTransactionalPutOperatorTest.java
index eef69d4..3cdc1bf 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/hbase/HBaseTransactionalPutOperatorTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/hbase/HBaseTransactionalPutOperatorTest.java
@@ -66,7 +66,7 @@ public class HBaseTransactionalPutOperatorTest {
}
@Override
- public AttributeMap getAttributes() {
+ public AttributeMap getAttributes() {
return null;
}
@@ -136,7 +136,7 @@ public class HBaseTransactionalPutOperatorTest {
}
@Override
- public AttributeMap getAttributes() {
+ public AttributeMap getAttributes() {
return null;
}
@@ -210,7 +210,7 @@ public class HBaseTransactionalPutOperatorTest {
}
@Override
- public AttributeMap getAttributes() {
+ public AttributeMap getAttributes() {
return null;
}
@@ -238,8 +238,8 @@ public class HBaseTransactionalPutOperatorTest {
}
});
-
-
+
+
thop.input.process(t2);
thop.endWindow();
HBaseTuple tuple,tuple2;
@@ -257,7 +257,7 @@ public class HBaseTransactionalPutOperatorTest {
logger.error(e.getMessage());
}
}
-
+
public static class TestHBasePutOperator extends
AbstractHBaseWindowPutOutputOperator<HBaseTuple> {
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/test/java/com/datatorrent/contrib/hbase/HBaseUtil.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/hbase/HBaseUtil.java b/contrib/src/test/java/com/datatorrent/contrib/hbase/HBaseUtil.java
index 9c237d7..5b54f3e 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/hbase/HBaseUtil.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/hbase/HBaseUtil.java
@@ -29,13 +29,13 @@ import org.apache.hadoop.hbase.client.HBaseAdmin;
public class HBaseUtil
{
- public static void createTable(Configuration configuration, String tableName ) throws MasterNotRunningException, ZooKeeperConnectionException, IOException
+ public static void createTable(Configuration configuration, String tableName ) throws MasterNotRunningException, ZooKeeperConnectionException, IOException
{
HBaseAdmin admin = null;
try
{
admin = new HBaseAdmin( configuration );
-
+
if (!admin.isTableAvailable(tableName) )
{
HTableDescriptor tableDescriptor = new HTableDescriptor(tableName);
@@ -54,14 +54,14 @@ public class HBaseUtil
}
}
}
-
+
public static void deleteTable( Configuration configuration, String tableName ) throws MasterNotRunningException, ZooKeeperConnectionException, IOException
{
HBaseAdmin admin = null;
try
{
admin = new HBaseAdmin( configuration );
-
+
if ( admin.isTableAvailable(tableName) )
{
admin.disableTable(tableName);
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/test/java/com/datatorrent/contrib/helper/MessageQueueTestHelper.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/helper/MessageQueueTestHelper.java b/contrib/src/test/java/com/datatorrent/contrib/helper/MessageQueueTestHelper.java
index e2eec7b..5465c28 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/helper/MessageQueueTestHelper.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/helper/MessageQueueTestHelper.java
@@ -47,7 +47,7 @@ public class MessageQueueTestHelper {
}
}
- public static ArrayList<HashMap<String, Integer>> getMessages()
+ public static ArrayList<HashMap<String, Integer>> getMessages()
{
ArrayList<HashMap<String, Integer>> mapList = new ArrayList<HashMap<String, Integer>>();
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaExactlyOnceOutputOperatorTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaExactlyOnceOutputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaExactlyOnceOutputOperatorTest.java
index e20a9fe..5f32fb0 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaExactlyOnceOutputOperatorTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaExactlyOnceOutputOperatorTest.java
@@ -45,7 +45,7 @@ public class KafkaExactlyOnceOutputOperatorTest extends KafkaOperatorTestBase
private static final int maxTuple = 40;
private static CountDownLatch latch;
private static boolean isRestarted = false;
-
+
/**
* Tuple generator for testing.
*/
@@ -68,7 +68,7 @@ public class KafkaExactlyOnceOutputOperatorTest extends KafkaOperatorTestBase
@Override
public void setup(OperatorContext context)
{
-
+
}
@Override
@@ -110,7 +110,7 @@ public class KafkaExactlyOnceOutputOperatorTest extends KafkaOperatorTestBase
for (int i = stringBuffer.size(); i-- > 0;) {
if (i == 20 && isRestarted == false) {
isRestarted = true;
- // fail the operator and when it gets back resend everything
+ // fail the operator and when it gets back resend everything
throw new RuntimeException();
}
outputPort.emit(stringBuffer.poll());
@@ -144,7 +144,7 @@ public class KafkaExactlyOnceOutputOperatorTest extends KafkaOperatorTestBase
StringGeneratorInputOperator generator = dag.addOperator("TestStringGenerator", StringGeneratorInputOperator.class);
final SimpleKafkaExactOnceOutputOperator node = dag.addOperator("Kafka message producer", SimpleKafkaExactOnceOutputOperator.class);
-
+
Properties props = new Properties();
props.setProperty("serializer.class", "kafka.serializer.StringEncoder");
props.put("metadata.broker.list", "localhost:9092");
@@ -152,7 +152,7 @@ public class KafkaExactlyOnceOutputOperatorTest extends KafkaOperatorTestBase
props.setProperty("queue.buffering.max.ms", "200");
props.setProperty("queue.buffering.max.messages", "10");
props.setProperty("batch.num.messages", "5");
-
+
node.setConfigProperties(props);
// Set configuration parameters for Kafka
node.setTopic("topic1");
@@ -160,14 +160,14 @@ public class KafkaExactlyOnceOutputOperatorTest extends KafkaOperatorTestBase
// Connect ports
dag.addStream("Kafka message", generator.outputPort, node.inputPort).setLocality(Locality.CONTAINER_LOCAL);
-
+
// Create local cluster
final LocalMode.Controller lc = lma.getController();
lc.runAsync();
Future f = Executors.newFixedThreadPool(1).submit(listener);
f.get(30, TimeUnit.SECONDS);
-
+
lc.shutdown();
// Check values send vs received
@@ -176,9 +176,9 @@ public class KafkaExactlyOnceOutputOperatorTest extends KafkaOperatorTestBase
Assert.assertEquals("First tuple", "testString 1", listener.getMessage(listener.holdingBuffer.peek()));
listener.close();
-
+
}
-
+
public static class SimpleKafkaExactOnceOutputOperator extends AbstractExactlyOnceKafkaOutputOperator<String, String, String>{
@Override
@@ -192,7 +192,7 @@ public class KafkaExactlyOnceOutputOperatorTest extends KafkaOperatorTestBase
{
return new Pair<String, String>(tuple.split("###")[0], tuple.split("###")[1]);
}
-
+
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaTestPartitioner.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaTestPartitioner.java b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaTestPartitioner.java
index 0e3e4e5..e409353 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaTestPartitioner.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaTestPartitioner.java
@@ -31,7 +31,7 @@ import kafka.utils.VerifiableProperties;
public class KafkaTestPartitioner implements Partitioner
{
public KafkaTestPartitioner (VerifiableProperties props) {
-
+
}
@Override
public int partition(Object key, int num_Partitions)
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaTestProducer.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaTestProducer.java b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaTestProducer.java
index 0a72a2e..cbd946a 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaTestProducer.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaTestProducer.java
@@ -40,7 +40,7 @@ public class KafkaTestProducer implements Runnable
private boolean hasPartition = false;
private boolean hasMultiCluster = false;
private List<String> messages;
-
+
private String producerType = "async";
public int getSendCount()
@@ -95,7 +95,7 @@ public class KafkaTestProducer implements Runnable
producer1 = null;
}
}
-
+
public KafkaTestProducer(String topic, boolean hasPartition) {
this(topic, hasPartition, false);
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/test/java/com/datatorrent/contrib/kinesis/KinesisOperatorTestBase.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/kinesis/KinesisOperatorTestBase.java b/contrib/src/test/java/com/datatorrent/contrib/kinesis/KinesisOperatorTestBase.java
index f3f8478..c9948ba 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/kinesis/KinesisOperatorTestBase.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/kinesis/KinesisOperatorTestBase.java
@@ -44,7 +44,7 @@ public class KinesisOperatorTestBase
protected transient AWSCredentialsProvider credentials = null;
private static final Logger logger = LoggerFactory.getLogger(KinesisOperatorTestBase.class);
-
+
private void createClient()
{
credentials = new DefaultAWSCredentialsProviderChain();
@@ -56,27 +56,27 @@ public class KinesisOperatorTestBase
{
CreateStreamRequest streamRequest = null;
createClient();
-
+
for( int i=0; i<100; ++i )
{
- try
+ try
{
streamName = streamNamePrefix + i;
streamRequest = new CreateStreamRequest();
streamRequest.setStreamName(streamName);
streamRequest.setShardCount(shardCount);
client.createStream(streamRequest);
-
+
logger.info( "created stream {}.", streamName );
Thread.sleep(30000);
-
+
break;
}
catch( ResourceInUseException riue )
{
logger.warn( "Resource is in use.", riue.getMessage() );
}
- catch (Exception e)
+ catch (Exception e)
{
logger.error( "Got exception.", e );
throw new RuntimeException(e);
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/test/java/com/datatorrent/contrib/kinesis/KinesisOutputOperatorTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/kinesis/KinesisOutputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/kinesis/KinesisOutputOperatorTest.java
index 368a191..b478b9f 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/kinesis/KinesisOutputOperatorTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/kinesis/KinesisOutputOperatorTest.java
@@ -52,7 +52,7 @@ public abstract class KinesisOutputOperatorTest< O extends AbstractKinesisOutput
super.beforeTest();
}
-
+
/**
* Test AbstractKinesisOutputOperator (i.e. an output adapter for Kinesis, aka producer).
* This module sends data into an ActiveMQ message bus.
@@ -94,7 +94,7 @@ public abstract class KinesisOutputOperatorTest< O extends AbstractKinesisOutput
// Create ActiveMQStringSinglePortOutputOperator
G generator = addGenerateOperator( dag );
-
+
O node = addTestingOperator( dag );
configureTestingOperator( node );
@@ -119,13 +119,13 @@ public abstract class KinesisOutputOperatorTest< O extends AbstractKinesisOutput
}
catch( Exception e ){}
}
-
+
if( listener != null )
listener.setIsAlive(false);
-
+
if( listenerThread != null )
listenerThread.join( 1000 );
-
+
lc.shutdown();
// Check values send vs received
@@ -141,10 +141,10 @@ public abstract class KinesisOutputOperatorTest< O extends AbstractKinesisOutput
protected KinesisTestConsumer createConsumerListener( String streamName )
{
KinesisTestConsumer listener = new KinesisTestConsumer(streamName);
-
+
return listener;
}
-
+
protected void configureTestingOperator( O node )
{
node.setAccessKey(credentials.getCredentials().getAWSAccessKeyId());
@@ -152,7 +152,7 @@ public abstract class KinesisOutputOperatorTest< O extends AbstractKinesisOutput
node.setBatchSize(500);
node.setStreamName(streamName);
}
-
+
protected abstract G addGenerateOperator( DAG dag );
protected abstract DefaultOutputPort getOutputPortOfGenerator( G generator );
protected abstract O addTestingOperator( DAG dag );
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/test/java/com/datatorrent/contrib/kinesis/KinesisStringOutputOperatorTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/kinesis/KinesisStringOutputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/kinesis/KinesisStringOutputOperatorTest.java
index 33f3179..f0a9eb7 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/kinesis/KinesisStringOutputOperatorTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/kinesis/KinesisStringOutputOperatorTest.java
@@ -34,13 +34,13 @@ public class KinesisStringOutputOperatorTest extends KinesisOutputOperatorTest<
return dag.addOperator("TestStringGenerator", StringGeneratorInputOperator.class);
//StringGeneratorInputOperator generator =
}
-
+
@Override
protected DefaultOutputPort getOutputPortOfGenerator( StringGeneratorInputOperator generator )
{
return generator.outputPort;
}
-
+
@Override
protected KinesisStringOutputOperator addTestingOperator(DAG dag)
{
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/test/java/com/datatorrent/contrib/kinesis/KinesisTestConsumer.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/kinesis/KinesisTestConsumer.java b/contrib/src/test/java/com/datatorrent/contrib/kinesis/KinesisTestConsumer.java
index 448ce72..a1547c1 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/kinesis/KinesisTestConsumer.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/kinesis/KinesisTestConsumer.java
@@ -48,11 +48,11 @@ public class KinesisTestConsumer implements Runnable
private volatile boolean isAlive = true;
private int receiveCount = 0;
-
+
private CountDownLatch doneLatch;
-
+
protected static final int MAX_TRY_TIMES = 30;
-
+
private void createClient()
{
AWSCredentialsProvider credentials = new DefaultAWSCredentialsProviderChain();
@@ -87,16 +87,16 @@ public class KinesisTestConsumer implements Runnable
buffer.get(bytes);
return new String(bytes);
}
-
+
@Override
public void run()
{
String iterator = prepareIterator();
-
- while (isAlive )
+
+ while (isAlive )
{
iterator = processNextIterator(iterator);
-
+
//sleep at least 1 second to avoid exceeding the limit on getRecords frequency
try
{
@@ -167,7 +167,7 @@ public class KinesisTestConsumer implements Runnable
return;
receiveCount += records.size();
logger.debug("ReceiveCount= {}", receiveCount);
-
+
for( Record record : records )
{
holdingBuffer.add(record);
@@ -175,18 +175,18 @@ public class KinesisTestConsumer implements Runnable
{
processRecord( record );
}
-
+
if( doneLatch != null )
doneLatch.countDown();
}
-
+
}
-
+
protected void processRecord( Record record )
{
-
+
}
-
+
public void close()
{
isAlive = false;
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/test/java/com/datatorrent/contrib/memcache/MemcachePOJOOperatorTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/memcache/MemcachePOJOOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/memcache/MemcachePOJOOperatorTest.java
index 60db81c..2ae525c 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/memcache/MemcachePOJOOperatorTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/memcache/MemcachePOJOOperatorTest.java
@@ -34,16 +34,16 @@ import net.spy.memcached.AddrUtil;
public class MemcachePOJOOperatorTest
{
public static final int TUPLE_SIZE = 1000;
-
+
private MemcacheStore store;
-
+
@Before
public void setup()
{
store = new MemcacheStore();
store.setServerAddresses(AddrUtil.getAddresses("localhost:11211") );
}
-
+
public void cleanup()
{
if( store != null )
@@ -57,9 +57,9 @@ public class MemcachePOJOOperatorTest
DTThrowable.rethrow(e);
}
}
-
+
}
-
+
@SuppressWarnings("unchecked")
@Test
public void testMemcacheOutputOperatorInternal() throws Exception
@@ -74,21 +74,21 @@ public class MemcachePOJOOperatorTest
operator.setTableInfo( tableInfo );
operator.setup(null);
-
+
TupleGenerator<TestPOJO> generator = new TupleGenerator<TestPOJO>( TestPOJO.class );
-
+
for( int i=0; i<TUPLE_SIZE; ++i )
{
operator.processTuple( generator.getNextTuple() );
}
-
+
readDataAndVerify( operator.getStore(), generator );
}
-
+
public void readDataAndVerify( MemcacheStore store, TupleGenerator<TestPOJO> generator )
{
generator.reset();
-
+
for( int i=0; i<TUPLE_SIZE; ++i )
{
TestPOJO expected = generator.getNextTuple();
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/test/java/com/datatorrent/contrib/memsql/AbstractMemsqlInputOperatorTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/memsql/AbstractMemsqlInputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/memsql/AbstractMemsqlInputOperatorTest.java
index aaa1e52..54c8d93 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/memsql/AbstractMemsqlInputOperatorTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/memsql/AbstractMemsqlInputOperatorTest.java
@@ -51,7 +51,7 @@ public class AbstractMemsqlInputOperatorTest
public static final int NUM_WINDOWS = 10;
public static final int DATABASE_SIZE = NUM_WINDOWS * BLAST_SIZE;
public static final int OPERATOR_ID = 0;
-
+
public static void populateDatabase(MemsqlStore memsqlStore)
{
memsqlStore.connect();
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQOutputOperatorBenchmark.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQOutputOperatorBenchmark.java b/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQOutputOperatorBenchmark.java
index 6122477..a128181 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQOutputOperatorBenchmark.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQOutputOperatorBenchmark.java
@@ -32,6 +32,6 @@ public class RabbitMQOutputOperatorBenchmark extends RabbitMQOutputOperatorTest
public void testDag() throws Exception
{
runTest(100000);
- logger.debug("end of test");
+ logger.debug("end of test");
}
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/test/java/com/datatorrent/contrib/redis/RedisInputOperatorTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/redis/RedisInputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/redis/RedisInputOperatorTest.java
index 010c534..6dcdfbe 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/redis/RedisInputOperatorTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/redis/RedisInputOperatorTest.java
@@ -136,7 +136,7 @@ public class RedisInputOperatorTest
RedisKeyValueInputOperator operator = new RedisKeyValueInputOperator();
operator.setWindowDataManager(new FSWindowDataManager());
-
+
operator.setStore(operatorStore);
operator.setScanCount(1);
Attribute.AttributeMap attributeMap = new Attribute.AttributeMap.DefaultAttributeMap();
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/test/java/com/datatorrent/contrib/splunk/SplunkInputOperatorTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/splunk/SplunkInputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/splunk/SplunkInputOperatorTest.java
index 654899a..32a4f39 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/splunk/SplunkInputOperatorTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/splunk/SplunkInputOperatorTest.java
@@ -26,7 +26,7 @@ import com.datatorrent.lib.helper.OperatorContextTestHelper;
import com.datatorrent.lib.testbench.CollectorTestSink;
/**
- *
+ *
* Unit test for splunk input operator. The test, queries splunk server for 100 rows and checks
* how many rows are returned.
*
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/test/java/com/datatorrent/contrib/splunk/SplunkTcpOutputOperatorTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/splunk/SplunkTcpOutputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/splunk/SplunkTcpOutputOperatorTest.java
index 241f7e1..f1d9285 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/splunk/SplunkTcpOutputOperatorTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/splunk/SplunkTcpOutputOperatorTest.java
@@ -25,7 +25,7 @@ import com.datatorrent.lib.testbench.CollectorTestSink;
import com.google.common.collect.Lists;
/**
- *
+ *
* Unit test for splunk tcp output operator. The test sends 10 values to the splunk server and then
* queries it for last 10 rows to check if the values are same or not.
*
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/test/java/com/datatorrent/contrib/util/FieldValueSerializableGenerator.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/util/FieldValueSerializableGenerator.java b/contrib/src/test/java/com/datatorrent/contrib/util/FieldValueSerializableGenerator.java
index 2975c9c..7d34d71 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/util/FieldValueSerializableGenerator.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/util/FieldValueSerializableGenerator.java
@@ -35,18 +35,18 @@ import com.datatorrent.lib.util.PojoUtils.Setter;
public class FieldValueSerializableGenerator< T extends FieldInfo> extends FieldValueGenerator<T>
{
-
+
public static < T extends FieldInfo > FieldValueSerializableGenerator<T> getFieldValueGenerator(final Class<?> clazz, List<T> fieldInfos)
{
return new FieldValueSerializableGenerator(clazz, fieldInfos);
}
-
-
+
+
private static final Logger logger = LoggerFactory.getLogger( FieldValueGenerator.class );
//it's better to same kryo instance for both de/serialize
private Kryo _kryo = null;
private Class<?> clazz;
-
+
private FieldValueSerializableGenerator(){}
public FieldValueSerializableGenerator(Class<?> clazz, List<T> fieldInfos)
@@ -58,7 +58,7 @@ public class FieldValueSerializableGenerator< T extends FieldInfo> extends Field
/**
* get the object which is serialized.
* this method will convert the object into a map from column name to column value and then serialize it
- *
+ *
* @param obj
* @return
*/
@@ -66,7 +66,7 @@ public class FieldValueSerializableGenerator< T extends FieldInfo> extends Field
{
//if don't have field information, just convert the whole object to byte[]
Object convertObj = obj;
-
+
//if fields are specified, convert to map and then convert map to byte[]
if( fieldGetterMap != null && !fieldGetterMap.isEmpty() )
{
@@ -82,15 +82,15 @@ public class FieldValueSerializableGenerator< T extends FieldInfo> extends Field
return os.toByteArray();
}
-
+
public Object deserializeObject( byte[] bytes )
{
Object obj = getKryo().readClassAndObject( new Input( bytes ) );
-
+
if( fieldGetterMap == null || fieldGetterMap.isEmpty() )
return obj;
-
+
// the obj in fact is a map, convert from map to object
try
{
@@ -114,7 +114,7 @@ public class FieldValueSerializableGenerator< T extends FieldInfo> extends Field
return obj;
}
}
-
+
protected Kryo getKryo()
{
if( _kryo == null )
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/test/java/com/datatorrent/contrib/util/POJOTupleGenerateOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/util/POJOTupleGenerateOperator.java b/contrib/src/test/java/com/datatorrent/contrib/util/POJOTupleGenerateOperator.java
index e2fc5cb..d9f5079 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/util/POJOTupleGenerateOperator.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/util/POJOTupleGenerateOperator.java
@@ -32,7 +32,7 @@ public class POJOTupleGenerateOperator<T> implements InputOperator, ActivationLi
{
protected final int DEFAULT_TUPLE_NUM = 10000;
public final transient DefaultOutputPort<T> outputPort = new DefaultOutputPort<T>();
-
+
private int tupleNum = DEFAULT_TUPLE_NUM;
private int batchNum = 5;
private TupleGenerator<T> tupleGenerator = null;
@@ -42,17 +42,17 @@ public class POJOTupleGenerateOperator<T> implements InputOperator, ActivationLi
public POJOTupleGenerateOperator()
{
}
-
+
public POJOTupleGenerateOperator( Class<T> tupleClass )
{
this.tupleClass = tupleClass;
}
-
+
public void setTupleType( Class<T> tupleClass )
{
this.tupleClass = tupleClass;
}
-
+
@Override
public void beginWindow(long windowId)
{
@@ -96,39 +96,39 @@ public class POJOTupleGenerateOperator<T> implements InputOperator, ActivationLi
catch( Exception e ){}
return;
}
-
-
+
+
for( int i=0; i<batchNum; ++i )
{
int count = emitedTuples.get();
if( count >= theTupleNum )
return;
-
+
if( emitedTuples.compareAndSet(count, count+1) )
{
- T tuple = getNextTuple();
+ T tuple = getNextTuple();
outputPort.emit ( tuple );
tupleEmitted( tuple );
-
+
if( count+1 == theTupleNum )
{
tupleEmitDone();
return;
}
}
-
+
}
}
-
-
+
+
protected void tupleEmitted( T tuple ){}
protected void tupleEmitDone(){}
-
+
public int getEmitedTupleCount()
{
return emitedTuples.get();
}
-
+
public int getTupleNum()
{
return tupleNum;
@@ -137,7 +137,7 @@ public class POJOTupleGenerateOperator<T> implements InputOperator, ActivationLi
{
this.tupleNum = tupleNum;
}
-
+
protected T getNextTuple()
{
if( tupleGenerator == null )
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/test/java/com/datatorrent/contrib/util/TestPOJO.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/util/TestPOJO.java b/contrib/src/test/java/com/datatorrent/contrib/util/TestPOJO.java
index 462c0b3..99910be 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/util/TestPOJO.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/util/TestPOJO.java
@@ -40,15 +40,15 @@ public class TestPOJO implements Serializable
fieldsInfo.add( new FieldInfo( "name", "name", SupportType.STRING ) );
fieldsInfo.add( new FieldInfo( "age", "age", SupportType.INTEGER ) );
fieldsInfo.add( new FieldInfo( "address", "address", SupportType.STRING ) );
-
+
return fieldsInfo;
}
-
+
public static String getRowExpression()
{
return "row";
}
-
+
public static TestPOJO from( Map<String,byte[]> map )
{
TestPOJO testPOJO = new TestPOJO();
@@ -58,14 +58,14 @@ public class TestPOJO implements Serializable
}
return testPOJO;
}
-
+
private Long rowId = null;
private String name;
private int age;
private String address;
public TestPOJO(){}
-
+
public TestPOJO(long rowId)
{
this(rowId, "name" + rowId, (int) rowId, "address" + rowId);
@@ -78,7 +78,7 @@ public class TestPOJO implements Serializable
setAge(age);
setAddress(address);
}
-
+
public void setValue( String fieldName, byte[] value )
{
if( "row".equalsIgnoreCase(fieldName) )
@@ -148,7 +148,7 @@ public class TestPOJO implements Serializable
{
this.address = address;
}
-
+
@Override
public boolean equals( Object obj )
{
@@ -156,7 +156,7 @@ public class TestPOJO implements Serializable
return false;
if( !( obj instanceof TestPOJO ) )
return false;
-
+
return completeEquals( (TestPOJO)obj );
}
@@ -172,7 +172,7 @@ public class TestPOJO implements Serializable
return false;
return true;
}
-
+
public boolean completeEquals( TestPOJO other )
{
if( other == null )
@@ -183,7 +183,7 @@ public class TestPOJO implements Serializable
return false;
return true;
}
-
+
public <T> boolean fieldEquals( T v1, T v2 )
{
if( v1 == null && v2 == null )
@@ -192,7 +192,7 @@ public class TestPOJO implements Serializable
return false;
return v1.equals( v2 );
}
-
+
@Override
public String toString()
{
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/test/java/com/datatorrent/contrib/util/TupleCacheOutputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/util/TupleCacheOutputOperator.java b/contrib/src/test/java/com/datatorrent/contrib/util/TupleCacheOutputOperator.java
index 93dc189..4bd8c79 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/util/TupleCacheOutputOperator.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/util/TupleCacheOutputOperator.java
@@ -33,10 +33,10 @@ public class TupleCacheOutputOperator<T> extends BaseOperator
{
private static final long serialVersionUID = 3090932382383138500L;
private static final Logger logger = LoggerFactory.getLogger( TupleCacheOutputOperator.class );
-
- //one instance of TupleCacheOutputOperator map to one
+
+ //one instance of TupleCacheOutputOperator map to one
private static Map< String, List<?> > receivedTuplesMap = new HashMap< String, List<?>>();
-
+
public final transient DefaultInputPort<T> inputPort = new DefaultInputPort<T>() {
@Override
@@ -45,14 +45,14 @@ public class TupleCacheOutputOperator<T> extends BaseOperator
processTuple( tuple );
}
};
-
+
private String uuid;
-
+
public TupleCacheOutputOperator()
{
uuid = java.util.UUID.randomUUID().toString();
}
-
+
public String getUuid()
{
return uuid;
@@ -74,7 +74,7 @@ public class TupleCacheOutputOperator<T> extends BaseOperator
{
return (List<T>)receivedTuplesMap.get(uuid);
}
-
+
public static List<Object> getReceivedTuples( String uuid )
{
return (List<Object>)receivedTuplesMap.get(uuid);
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/test/java/com/datatorrent/contrib/util/TupleGenerateCacheOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/util/TupleGenerateCacheOperator.java b/contrib/src/test/java/com/datatorrent/contrib/util/TupleGenerateCacheOperator.java
index bfce6f5..8ee38dd 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/util/TupleGenerateCacheOperator.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/util/TupleGenerateCacheOperator.java
@@ -25,16 +25,16 @@ import java.util.Map;
public class TupleGenerateCacheOperator<T> extends POJOTupleGenerateOperator<T>
{
- //one instance of TupleCacheOutputOperator map to one
+ //one instance of TupleCacheOutputOperator map to one
private static Map< String, List<?> > emittedTuplesMap = new HashMap< String, List<?>>();
private String uuid;
-
+
public TupleGenerateCacheOperator()
{
uuid = java.util.UUID.randomUUID().toString();
}
-
+
@SuppressWarnings("unchecked")
protected void tupleEmitted( T tuple )
{
@@ -46,7 +46,7 @@ public class TupleGenerateCacheOperator<T> extends POJOTupleGenerateOperator<T>
}
emittedTuples.add(tuple);
}
-
+
@SuppressWarnings("unchecked")
public List<T> getTuples()
{
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/test/java/com/datatorrent/contrib/util/TupleGenerator.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/util/TupleGenerator.java b/contrib/src/test/java/com/datatorrent/contrib/util/TupleGenerator.java
index cea81d5..844d31d 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/util/TupleGenerator.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/util/TupleGenerator.java
@@ -25,26 +25,26 @@ import org.slf4j.LoggerFactory;
/**
* This is a copy from contrib, should be merged later.
- *
+ *
*/
public class TupleGenerator<T>
{
private static final Logger logger = LoggerFactory.getLogger( TupleGenerator.class );
-
+
private volatile long rowId = 0;
private Constructor<T> constructor;
-
+
private static Class<?>[] paramTypes = new Class<?>[]{ Long.class, long.class, Integer.class, int.class };
-
+
public TupleGenerator()
{
}
-
+
public TupleGenerator( Class<T> tupleClass )
{
useTupleClass( tupleClass );
}
-
+
public void useTupleClass( Class<T> tupleClass )
{
for( Class<?> paramType : paramTypes )
@@ -59,7 +59,7 @@ public class TupleGenerator<T>
throw new RuntimeException( "Not found proper constructor." );
}
}
-
+
protected Constructor<T> tryGetConstructor( Class<T> tupleClass, Class<?> parameterType )
{
try
@@ -71,17 +71,17 @@ public class TupleGenerator<T>
return null;
}
}
-
+
public void reset()
{
rowId = 0;
}
-
+
public T getNextTuple()
{
if( constructor == null )
throw new RuntimeException( "Not found proper constructor." );
-
+
long curRowId = ++rowId;
try
{
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/test/java/com/datatorrent/contrib/zmq/ZeroMQInputOperatorTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/zmq/ZeroMQInputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/zmq/ZeroMQInputOperatorTest.java
index 3538891..1bb599a 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/zmq/ZeroMQInputOperatorTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/zmq/ZeroMQInputOperatorTest.java
@@ -96,7 +96,7 @@ public class ZeroMQInputOperatorTest
Thread.sleep(10);
} else {
break;
- }
+ }
}
}
catch (InterruptedException ex) {
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/test/java/com/datatorrent/contrib/zmq/ZeroMQMessageGenerator.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/zmq/ZeroMQMessageGenerator.java b/contrib/src/test/java/com/datatorrent/contrib/zmq/ZeroMQMessageGenerator.java
index 27cd278..6155ec3 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/zmq/ZeroMQMessageGenerator.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/zmq/ZeroMQMessageGenerator.java
@@ -51,7 +51,7 @@ class ZeroMQMessageGenerator {
public void send(Object message)
{
- String msg = message.toString();
+ String msg = message.toString();
publisher.send(msg.getBytes(), 0);
}
@@ -72,8 +72,8 @@ class ZeroMQMessageGenerator {
ArrayList<HashMap<String, Integer>> dataMaps = MessageQueueTestHelper.getMessages();
for(int j =0; j < dataMaps.size(); j++)
{
- send(dataMaps.get(j));
- }
+ send(dataMaps.get(j));
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/test/java/com/datatorrent/contrib/zmq/ZeroMQMessageReceiver.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/zmq/ZeroMQMessageReceiver.java b/contrib/src/test/java/com/datatorrent/contrib/zmq/ZeroMQMessageReceiver.java
index f472828..bb04817 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/zmq/ZeroMQMessageReceiver.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/zmq/ZeroMQMessageReceiver.java
@@ -56,9 +56,9 @@ final class ZeroMQMessageReceiver implements Runnable
@Override
public void run()
{
- logger.debug("receiver running");
+ logger.debug("receiver running");
while (!Thread.currentThread().isInterrupted() && !shutDown) {
- //logger.debug("receiver running in loop");
+ //logger.debug("receiver running in loop");
byte[] msg = subscriber.recv(ZMQ.NOBLOCK);
// convert to HashMap and save the values for each key
// then expect c to be 1000, b=20, a=2
@@ -68,7 +68,7 @@ final class ZeroMQMessageReceiver implements Runnable
continue;
}
String str = new String(msg);
-
+
if (str.indexOf("{") == -1) {
continue;
}
@@ -85,7 +85,7 @@ final class ZeroMQMessageReceiver implements Runnable
public void teardown()
{
shutDown=true;
-
+
syncclient.close();
subscriber.close();
context.term();
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/test/java/com/datatorrent/contrib/zmq/ZeroMQOutputOperatorTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/zmq/ZeroMQOutputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/zmq/ZeroMQOutputOperatorTest.java
index b8332c0..41c5248 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/zmq/ZeroMQOutputOperatorTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/zmq/ZeroMQOutputOperatorTest.java
@@ -47,7 +47,7 @@ public class ZeroMQOutputOperatorTest
final int testNum = 3;
runTest(testNum);
-
+
logger.debug("end of test");
}
@@ -60,7 +60,7 @@ public class ZeroMQOutputOperatorTest
collector.setUrl("tcp://*:5556");
collector.setSyncUrl("tcp://*:5557");
collector.setSUBSCRIBERS_EXPECTED(1);
-
+
dag.addStream("Stream", source.outPort, collector.inputPort).setLocality(Locality.CONTAINER_LOCAL);
final LocalMode.Controller lc = lma.getController();
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/FullOuterJoinOperatorTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/FullOuterJoinOperatorTest.java b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/FullOuterJoinOperatorTest.java
index 762d322..ce5ba33 100644
--- a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/FullOuterJoinOperatorTest.java
+++ b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/FullOuterJoinOperatorTest.java
@@ -41,11 +41,11 @@ public class FullOuterJoinOperatorTest
CollectorTestSink sink = new CollectorTestSink();
oper.outport.setSink(sink);
- // set column join condition
+ // set column join condition
Condition cond = new JoinColumnEqualCondition("a", "a");
oper.setJoinCondition(cond);
-
- // add columns
+
+ // add columns
oper.selectTable1Column(new ColumnIndex("b", null));
oper.selectTable2Column(new ColumnIndex("c", null));
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/GroupByOperatorTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/GroupByOperatorTest.java b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/GroupByOperatorTest.java
index 6ad818e..0d2a2f5 100644
--- a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/GroupByOperatorTest.java
+++ b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/GroupByOperatorTest.java
@@ -85,7 +85,7 @@ public class GroupByOperatorTest
tuple.put("b", 2);
tuple.put("c", 7);
oper.inport.process(tuple);
-
+
oper.endWindow();
oper.teardown();
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/HavingOperatorTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/HavingOperatorTest.java b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/HavingOperatorTest.java
index 5b696f1..3c685ab 100644
--- a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/HavingOperatorTest.java
+++ b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/HavingOperatorTest.java
@@ -87,7 +87,7 @@ public class HavingOperatorTest
tuple.put("b", 2);
tuple.put("c", 7);
oper.inport.process(tuple);
-
+
oper.endWindow();
oper.teardown();
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/InnerJoinOperatorTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/InnerJoinOperatorTest.java b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/InnerJoinOperatorTest.java
index 8b4f923..18312d1 100644
--- a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/InnerJoinOperatorTest.java
+++ b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/InnerJoinOperatorTest.java
@@ -30,7 +30,7 @@ import com.datatorrent.lib.streamquery.index.ColumnIndex;
import com.datatorrent.lib.testbench.CollectorTestSink;
/**
- *
+ *
* Functional test for {@link com.datatorrent.lib.streamquery.InnerJoinOperator }.
* @deprecated
*/
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/LeftOuterJoinOperatorTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/LeftOuterJoinOperatorTest.java b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/LeftOuterJoinOperatorTest.java
index f78ba21..eb1ec6d 100644
--- a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/LeftOuterJoinOperatorTest.java
+++ b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/LeftOuterJoinOperatorTest.java
@@ -41,11 +41,11 @@ public class LeftOuterJoinOperatorTest
CollectorTestSink sink = new CollectorTestSink();
oper.outport.setSink(sink);
- // set column join condition
+ // set column join condition
Condition cond = new JoinColumnEqualCondition("a", "a");
oper.setJoinCondition(cond);
-
- // add columns
+
+ // add columns
oper.selectTable1Column(new ColumnIndex("b", null));
oper.selectTable2Column(new ColumnIndex("c", null));
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/RightOuterJoinOperatorTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/RightOuterJoinOperatorTest.java b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/RightOuterJoinOperatorTest.java
index 8142276..70bc031 100644
--- a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/RightOuterJoinOperatorTest.java
+++ b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/RightOuterJoinOperatorTest.java
@@ -42,11 +42,11 @@ public class RightOuterJoinOperatorTest
CollectorTestSink sink = new CollectorTestSink();
oper.outport.setSink(sink);
- // set column join condition
+ // set column join condition
Condition cond = new JoinColumnEqualCondition("a", "a");
oper.setJoinCondition(cond);
-
- // add columns
+
+ // add columns
oper.selectTable1Column(new ColumnIndex("b", null));
oper.selectTable2Column(new ColumnIndex("c", null));
@@ -83,7 +83,7 @@ public class RightOuterJoinOperatorTest
tuple.put("b", 11);
tuple.put("c", 12);
oper.inport2.process(tuple);
-
+
oper.endWindow();
oper.teardown();
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/SelectTopOperatorTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/SelectTopOperatorTest.java b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/SelectTopOperatorTest.java
index 90480cf..4b609c1 100644
--- a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/SelectTopOperatorTest.java
+++ b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/SelectTopOperatorTest.java
@@ -37,20 +37,20 @@ public class SelectTopOperatorTest
oper.setTopValue(2);
CollectorTestSink sink = new CollectorTestSink();
oper.outport.setSink(sink);
-
+
oper.beginWindow(1);
HashMap<String, Object> tuple = new HashMap<String, Object>();
tuple.put("a", 0);
tuple.put("b", 1);
tuple.put("c", 2);
oper.inport.process(tuple);
-
+
tuple = new HashMap<String, Object>();
tuple.put("a", 1);
tuple.put("b", 3);
tuple.put("c", 4);
oper.inport.process(tuple);
-
+
tuple = new HashMap<String, Object>();
tuple.put("a", 1);
tuple.put("b", 5);
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/BetweenConditionTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/BetweenConditionTest.java b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/BetweenConditionTest.java
index 01465db..568aed9 100644
--- a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/BetweenConditionTest.java
+++ b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/BetweenConditionTest.java
@@ -79,7 +79,7 @@ public class BetweenConditionTest
tuple.put("b", 7);
tuple.put("c", 8);
oper.inport.process(tuple);
-
+
oper.endWindow();
oper.teardown();
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/CompoundConditionTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/CompoundConditionTest.java b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/CompoundConditionTest.java
index e160e5d..929d134 100644
--- a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/CompoundConditionTest.java
+++ b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/CompoundConditionTest.java
@@ -84,7 +84,7 @@ public class CompoundConditionTest
tuple.put("b", 7);
tuple.put("c", 8);
oper.inport.process(tuple);
-
+
oper.endWindow();
oper.teardown();
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/InConditionTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/InConditionTest.java b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/InConditionTest.java
index d641a1c..255389b 100644
--- a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/InConditionTest.java
+++ b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/InConditionTest.java
@@ -81,7 +81,7 @@ public class InConditionTest
tuple.put("b", 7);
tuple.put("c", 8);
oper.inport.process(tuple);
-
+
oper.endWindow();
oper.teardown();
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/data/AverageData.java
----------------------------------------------------------------------
diff --git a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/data/AverageData.java b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/data/AverageData.java
index b9132d8..3c74cc5 100644
--- a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/data/AverageData.java
+++ b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/data/AverageData.java
@@ -45,7 +45,7 @@ public class AverageData
/**
* This constructor takes the value of sum and count and initialize the local attributes to corresponding values
- *
+ *
* @param count
* the value of count
*/
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/data/MachineInfo.java
----------------------------------------------------------------------
diff --git a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/data/MachineInfo.java b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/data/MachineInfo.java
index a5dda7e..6f02a24 100644
--- a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/data/MachineInfo.java
+++ b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/data/MachineInfo.java
@@ -42,7 +42,7 @@ public class MachineInfo
/**
* This constructor takes MachineKey as input and initialize local attributes
- *
+ *
* @param machineKey
* the MachineKey instance
*/
@@ -53,7 +53,7 @@ public class MachineInfo
/**
* This constructor takes MachineKey, cpu usage, ram usage, hdd usage as input and initialize local attributes
- *
+ *
* @param machineKey
* the MachineKey instance
* @param cpu
@@ -73,7 +73,7 @@ public class MachineInfo
/**
* This method returns the MachineKey
- *
+ *
* @return
*/
public MachineKey getMachineKey()
@@ -83,7 +83,7 @@ public class MachineInfo
/**
* This method sets the MachineKey
- *
+ *
* @param machineKey
* the MachineKey instance
*/
@@ -94,7 +94,7 @@ public class MachineInfo
/**
* This method returns the CPU% usage
- *
+ *
* @return
*/
public int getCpu()
@@ -104,7 +104,7 @@ public class MachineInfo
/**
* This method sets the CPU% usage
- *
+ *
* @param cpu
* the CPU% usage
*/
@@ -115,7 +115,7 @@ public class MachineInfo
/**
* This method returns the RAM% usage
- *
+ *
* @return
*/
public int getRam()
@@ -125,7 +125,7 @@ public class MachineInfo
/**
* This method sets the RAM% usage
- *
+ *
* @param ram
* the RAM% usage
*/
@@ -136,7 +136,7 @@ public class MachineInfo
/**
* This method returns the HDD% usage
- *
+ *
* @return
*/
public int getHdd()
@@ -146,7 +146,7 @@ public class MachineInfo
/**
* This method sets the HDD% usage
- *
+ *
* @param hdd
* the HDD% usage
*/
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/demos/mobile/src/main/java/com/datatorrent/demos/mobile/PhoneEntryOperator.java
----------------------------------------------------------------------
diff --git a/demos/mobile/src/main/java/com/datatorrent/demos/mobile/PhoneEntryOperator.java b/demos/mobile/src/main/java/com/datatorrent/demos/mobile/PhoneEntryOperator.java
index 8964d84..f6708ba 100644
--- a/demos/mobile/src/main/java/com/datatorrent/demos/mobile/PhoneEntryOperator.java
+++ b/demos/mobile/src/main/java/com/datatorrent/demos/mobile/PhoneEntryOperator.java
@@ -59,8 +59,8 @@ public class PhoneEntryOperator extends BaseOperator
/**
* Sets the initial number of phones to display on the google map.
- *
- * @param i the count of initial phone numbers to display
+ *
+ * @param i the count of initial phone numbers to display
*/
public void setInitialDisplayCount(int i)
{
@@ -69,8 +69,8 @@ public class PhoneEntryOperator extends BaseOperator
/**
* Sets the range for the phone numbers generated by the operator.
- *
- * @param i the range within which the phone numbers are randomly generated.
+ *
+ * @param i the range within which the phone numbers are randomly generated.
*/
public void setPhoneRange(Range<Integer> phoneRange)
{
@@ -80,7 +80,7 @@ public class PhoneEntryOperator extends BaseOperator
/**
* Sets the max seed for random phone number generation
- *
+ *
* @param i the number to initialize the random number phone generator.
*/
public void setMaxSeedPhoneNumber(int number)