You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by vr...@apache.org on 2016/09/27 16:41:18 UTC
[4/6] apex-malhar git commit: Fix trailing whitespace.
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/demos/pi/src/test/java/com/datatorrent/demos/pi/CalculatorTest.java
----------------------------------------------------------------------
diff --git a/demos/pi/src/test/java/com/datatorrent/demos/pi/CalculatorTest.java b/demos/pi/src/test/java/com/datatorrent/demos/pi/CalculatorTest.java
index 8e12fcc..21079d7 100644
--- a/demos/pi/src/test/java/com/datatorrent/demos/pi/CalculatorTest.java
+++ b/demos/pi/src/test/java/com/datatorrent/demos/pi/CalculatorTest.java
@@ -30,7 +30,7 @@ public class CalculatorTest
{
@Test
public void testSomeMethod() throws Exception
- {
+ {
LocalMode lma = LocalMode.newInstance();
Configuration conf = new Configuration(false);
conf.addResource("dt-site-pilibrary.xml");
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/hive/src/main/java/com/datatorrent/contrib/hive/AbstractFSRollingOutputOperator.java
----------------------------------------------------------------------
diff --git a/hive/src/main/java/com/datatorrent/contrib/hive/AbstractFSRollingOutputOperator.java b/hive/src/main/java/com/datatorrent/contrib/hive/AbstractFSRollingOutputOperator.java
index 6360768..3c9c4da 100755
--- a/hive/src/main/java/com/datatorrent/contrib/hive/AbstractFSRollingOutputOperator.java
+++ b/hive/src/main/java/com/datatorrent/contrib/hive/AbstractFSRollingOutputOperator.java
@@ -210,7 +210,7 @@ public abstract class AbstractFSRollingOutputOperator<T> extends AbstractFileOut
* written to. Example: If hive partitions are date='2014-12-12',country='USA'
* then this method returns {"2014-12-12","USA"} The implementation is left to
* the user.
- *
+ *
* @param tuple
* A received tuple to be written to a hive partition.
* @return ArrayList containing hive partition values.
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/hive/src/main/java/com/datatorrent/contrib/hive/HiveOperator.java
----------------------------------------------------------------------
diff --git a/hive/src/main/java/com/datatorrent/contrib/hive/HiveOperator.java b/hive/src/main/java/com/datatorrent/contrib/hive/HiveOperator.java
index 8e3b143..ed4ca85 100755
--- a/hive/src/main/java/com/datatorrent/contrib/hive/HiveOperator.java
+++ b/hive/src/main/java/com/datatorrent/contrib/hive/HiveOperator.java
@@ -61,7 +61,7 @@ public class HiveOperator extends AbstractStoreOutputOperator<FilePartitionMappi
/**
* Hive store.
- *
+ *
* @deprecated use {@link AbstractStoreOutputOperator#store} instead
*/
@Deprecated
@@ -226,7 +226,7 @@ public class HiveOperator extends AbstractStoreOutputOperator<FilePartitionMappi
/**
* Get the partition columns in hive to which data needs to be loaded.
- *
+ *
* @return List of Hive Partition Columns
*/
public ArrayList<String> getHivePartitionColumns()
@@ -236,7 +236,7 @@ public class HiveOperator extends AbstractStoreOutputOperator<FilePartitionMappi
/**
* Set the hive partition columns to which data needs to be loaded.
- *
+ *
* @param hivePartitionColumns
*/
public void setHivePartitionColumns(ArrayList<String> hivePartitionColumns)
@@ -246,7 +246,7 @@ public class HiveOperator extends AbstractStoreOutputOperator<FilePartitionMappi
/**
* Get the table name in hive.
- *
+ *
* @return table name
*/
public String getTablename()
@@ -256,7 +256,7 @@ public class HiveOperator extends AbstractStoreOutputOperator<FilePartitionMappi
/**
* Set the table name in hive.
- *
+ *
* @param tablename
*/
public void setTablename(String tablename)
@@ -266,7 +266,7 @@ public class HiveOperator extends AbstractStoreOutputOperator<FilePartitionMappi
/**
* Gets the store set for hive;
- *
+ *
* @deprecated use {@link #getStore()} instead.
* @return hive store
*/
@@ -278,7 +278,7 @@ public class HiveOperator extends AbstractStoreOutputOperator<FilePartitionMappi
/**
* Set the store in hive.
- *
+ *
* @deprecated use {@link #setStore()} instead.
* @param hivestore
*/
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/hive/src/main/java/org/apache/apex/malhar/hive/HiveOutputModule.java
----------------------------------------------------------------------
diff --git a/hive/src/main/java/org/apache/apex/malhar/hive/HiveOutputModule.java b/hive/src/main/java/org/apache/apex/malhar/hive/HiveOutputModule.java
index 3491b3c..d859634 100644
--- a/hive/src/main/java/org/apache/apex/malhar/hive/HiveOutputModule.java
+++ b/hive/src/main/java/org/apache/apex/malhar/hive/HiveOutputModule.java
@@ -164,7 +164,7 @@ public class HiveOutputModule implements Module
/**
* The path of the directory to where files are written.
- *
+ *
* @return file path
*/
public String getFilePath()
@@ -174,7 +174,7 @@ public class HiveOutputModule implements Module
/**
* The path of the directory to where files are written.
- *
+ *
* @param filePath
* file path
*/
@@ -185,7 +185,7 @@ public class HiveOutputModule implements Module
/**
* Names of the columns in hive table (excluding partitioning columns).
- *
+ *
* @return Hive column names
*/
public String[] getHiveColumns()
@@ -195,7 +195,7 @@ public class HiveOutputModule implements Module
/**
* Names of the columns in hive table (excluding partitioning columns).
- *
+ *
* @param hiveColumns
* Hive column names
*/
@@ -207,7 +207,7 @@ public class HiveOutputModule implements Module
/**
* Data types of the columns in hive table (excluding partitioning columns).
* This sequence should match to the fields in hiveColumnDataTypes
- *
+ *
* @return Hive column data types
*/
public FIELD_TYPE[] getHiveColumnDataTypes()
@@ -218,7 +218,7 @@ public class HiveOutputModule implements Module
/**
* Data types of the columns in hive table (excluding partitioning columns).
* This sequence should match to the fields in hiveColumnDataTypes *
- *
+ *
* @param hiveColumnDataTypes
* Hive column data types
*/
@@ -230,7 +230,7 @@ public class HiveOutputModule implements Module
/**
* Expressions for the hive columns (excluding partitioning columns). This
* sequence should match to the fields in hiveColumnDataTypes
- *
+ *
* @return
*/
public String[] getExpressionsForHiveColumns()
@@ -241,7 +241,7 @@ public class HiveOutputModule implements Module
/**
* Expressions for the hive columns (excluding partitioning columns). This
* sequence should match to the fields in hiveColumnDataTypes
- *
+ *
* @param expressionsForHiveColumns
*/
public void setExpressionsForHiveColumns(String[] expressionsForHiveColumns)
@@ -251,7 +251,7 @@ public class HiveOutputModule implements Module
/**
* Names of the columns on which hive data should be partitioned
- *
+ *
* @return hive partition columns
*/
public String[] getHivePartitionColumns()
@@ -261,7 +261,7 @@ public class HiveOutputModule implements Module
/**
* Names of the columns on which hive data should be partitioned
- *
+ *
* @param hivePartitionColumns
* Hive partition columns
*/
@@ -273,7 +273,7 @@ public class HiveOutputModule implements Module
/**
* Data types of the columns on which hive data should be partitioned. This
* sequence should match to the fields in hivePartitionColumns
- *
+ *
* @return Hive partition column data types
*/
public FIELD_TYPE[] getHivePartitionColumnDataTypes()
@@ -284,7 +284,7 @@ public class HiveOutputModule implements Module
/**
* Data types of the columns on which hive data should be partitioned. This
* sequence should match to the fields in hivePartitionColumns
- *
+ *
* @param hivePartitionColumnDataTypes
* Hive partition column data types
*/
@@ -296,7 +296,7 @@ public class HiveOutputModule implements Module
/**
* Expressions for the hive partition columns. This sequence should match to
* the fields in hivePartitionColumns
- *
+ *
* @return Expressions for hive partition columns
*/
public String[] getExpressionsForHivePartitionColumns()
@@ -307,7 +307,7 @@ public class HiveOutputModule implements Module
/**
* Expressions for the hive partition columns. This sequence should match to
* the fields in hivePartitionColumns
- *
+ *
* @param expressionsForHivePartitionColumns
* Expressions for hive partition columns
*/
@@ -318,7 +318,7 @@ public class HiveOutputModule implements Module
/**
* The maximum length in bytes of a rolling file.
- *
+ *
* @return maximum size of file
*/
public Long getMaxLength()
@@ -328,7 +328,7 @@ public class HiveOutputModule implements Module
/**
* The maximum length in bytes of a rolling file.
- *
+ *
* @param maxLength
* maximum size of file
*/
@@ -339,7 +339,7 @@ public class HiveOutputModule implements Module
/**
* Connection URL for connecting to hive.
- *
+ *
* @return database url
*/
public String getDatabaseUrl()
@@ -349,7 +349,7 @@ public class HiveOutputModule implements Module
/**
* Connection URL for connecting to hive.
- *
+ *
* @param databaseUrl
* database url
*/
@@ -360,7 +360,7 @@ public class HiveOutputModule implements Module
/**
* Driver for connecting to hive.
- *
+ *
* @return database driver
*/
public String getDatabaseDriver()
@@ -370,7 +370,7 @@ public class HiveOutputModule implements Module
/**
* Driver for connecting to hive.
- *
+ *
* @param databaseDriver
* database driver
*/
@@ -381,7 +381,7 @@ public class HiveOutputModule implements Module
/**
* Username for connecting to hive
- *
+ *
* @return user name
*/
public String getUserName()
@@ -391,7 +391,7 @@ public class HiveOutputModule implements Module
/**
* Username for connecting to hive
- *
+ *
* @param username
* user name
*/
@@ -402,7 +402,7 @@ public class HiveOutputModule implements Module
/**
* Password for connecting to hive
- *
+ *
* @return password
*/
public String getPassword()
@@ -412,7 +412,7 @@ public class HiveOutputModule implements Module
/**
* Password for connecting to hive
- *
+ *
* @param password
* password
*/
@@ -423,7 +423,7 @@ public class HiveOutputModule implements Module
/**
* Table name for writing data into hive
- *
+ *
* @return table name
*/
public String getTablename()
@@ -433,7 +433,7 @@ public class HiveOutputModule implements Module
/**
* Table name for writing data into hive
- *
+ *
* @param tablename
* table name
*/
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
----------------------------------------------------------------------
diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
index 772399d..ad5c3fa 100644
--- a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
+++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
@@ -110,20 +110,20 @@ public abstract class AbstractKafkaPartitioner implements Partitioner<AbstractKa
metadata.get(clusters[i]).put(topic, ptis);
break;
}
-
+
logger.warn("Partition metadata for topic {} is null. retrying...", topic);
-
+
} catch (Exception e) {
logger.warn("Got Exception when trying get partition info for topic {}.", topic, e);
}
-
+
try {
Thread.sleep(100);
} catch (Exception e1) {
//ignore
}
} //end while
-
+
if (tryTime == 0) {
throw new RuntimeException("Get partition info for topic completely failed. Please check the log file. topic name: " + topic);
}
@@ -183,8 +183,8 @@ public abstract class AbstractKafkaPartitioner implements Partitioner<AbstractKa
}
metadataRefreshClients = null;
}
-
-
+
+
@Override
public void partitioned(Map<Integer, Partition<AbstractKafkaInputOperator>> map)
{
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaConsumerWrapper.java
----------------------------------------------------------------------
diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaConsumerWrapper.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaConsumerWrapper.java
index 143a5bd..fa4856e 100644
--- a/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaConsumerWrapper.java
+++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaConsumerWrapper.java
@@ -228,7 +228,7 @@ public class KafkaConsumerWrapper implements Closeable
}
}
}
-
+
protected void handleNoOffsetForPartitionException(NoOffsetForPartitionException e, KafkaConsumer<byte[], byte[]> consumer)
{
// if initialOffset is set to EARLIST or LATEST
@@ -244,7 +244,7 @@ public class KafkaConsumerWrapper implements Closeable
} else {
consumer.seekToEnd(e.partitions().toArray(new TopicPartition[0]));
}
-
+
}
/**
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java
----------------------------------------------------------------------
diff --git a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java
index 8440615..4e97d72 100644
--- a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java
+++ b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java
@@ -66,7 +66,7 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase
private String partition = null;
private String testName = "";
-
+
public static String APPLICATION_PATH = baseDir + File.separator + StramLocalCluster.class.getName() + File.separator;
public class KafkaTestInfo extends TestWatcher
@@ -86,11 +86,11 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase
this.desc = description;
}
}
-
+
@Rule
public final KafkaTestInfo testInfo = new KafkaTestInfo();
-
-
+
+
@Parameterized.Parameters(name = "multi-cluster: {0}, multi-partition: {1}, partition: {2}")
public static Collection<Object[]> testScenario()
{
@@ -116,7 +116,7 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase
tupleCollection.clear();
//reset count for next new test case
k = 0;
-
+
createTopic(0, testName);
if (hasMultiCluster) {
createTopic(1, testName);
@@ -146,14 +146,14 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase
private static final int failureTrigger = 3 * scale;
private static final int tuplesPerWindow = 5 * scale;
private static final int waitTime = 60000 + 300 * scale;
-
- //This latch was used to count the END_TUPLE, but the order of tuple can't be guaranteed,
+
+ //This latch was used to count the END_TUPLE, but the order of tuple can't be guaranteed,
//so, count valid tuple instead.
private static CountDownLatch latch;
private static boolean hasFailure = false;
private static int k = 0;
private static Thread monitorThread;
-
+
/**
* Test Operator to collect tuples from KafkaSingleInputStringOperator.
*
@@ -179,7 +179,7 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase
transient List<String> windowTupleCollector = Lists.newArrayList();
private transient Map<String, List<String>> tupleCollectedInWindow = new HashMap<>();
private int endTuples = 0;
-
+
@Override
public void setup(Context.OperatorContext context)
{
@@ -196,7 +196,7 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase
endTuples = 0;
}
-
+
public void processTuple(byte[] bt)
{
String tuple = new String(bt);
@@ -208,10 +208,10 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase
if (tuple.startsWith(KafkaOperatorTestBase.END_TUPLE)) {
endTuples++;
}
-
+
windowTupleCollector.add(tuple);
}
-
+
@Override
public void endWindow()
{
@@ -231,7 +231,7 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase
//discard the tuples of this window if except happened
int tupleSize = windowTupleCollector.size();
tupleCollection.addAll(windowTupleCollector);
-
+
int countDownTupleSize = countDownAll ? tupleSize : endTuples;
if (latch != null) {
@@ -303,8 +303,8 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase
// each broker should get a END_TUPLE message
latch = new CountDownLatch(countDownAll ? totalCount + totalBrokers : totalBrokers);
- logger.info("Test Case: name: {}; totalBrokers: {}; hasFailure: {}; hasMultiCluster: {}; hasMultiPartition: {}, partition: {}",
- testName, totalBrokers, hasFailure, hasMultiCluster, hasMultiPartition, partition);
+ logger.info("Test Case: name: {}; totalBrokers: {}; hasFailure: {}; hasMultiCluster: {}; hasMultiPartition: {}, partition: {}",
+ testName, totalBrokers, hasFailure, hasMultiCluster, hasMultiPartition, partition);
// Start producer
KafkaTestProducer p = new KafkaTestProducer(testName, hasMultiPartition, hasMultiCluster);
@@ -313,7 +313,7 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase
t.start();
int expectedReceiveCount = totalCount + totalBrokers;
-
+
// Create DAG for testing.
LocalMode lma = LocalMode.newInstance();
DAG dag = lma.getDAG();
@@ -346,7 +346,7 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase
LocalMode.Controller lc = lma.getController();
lc.setHeartbeatMonitoringEnabled(false);
- //let the Controller to run the inside another thread. It is almost same as call Controller.runAsync(),
+ //let the Controller to run the inside another thread. It is almost same as call Controller.runAsync(),
//but Controller.runAsync() don't expose the thread which run it, so we don't know when the thread will be terminated.
//create this thread and then call join() to make sure the Controller shutdown completely.
monitorThread = new Thread((StramLocalCluster)lc, "master");
@@ -363,9 +363,9 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase
} catch (Exception e) {
logger.warn(e.getMessage());
}
-
+
t.join();
-
+
if (!notTimeout || expectedReceiveCount != tupleCollection.size()) {
logger.info("Number of received/expected tuples: {}/{}, testName: {}, tuples: \n{}", tupleCollection.size(),
expectedReceiveCount, testName, tupleCollection);
@@ -373,13 +373,13 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase
Assert.assertTrue("TIMEOUT. testName: " + this.testName + "; Collected data: " + tupleCollection, notTimeout);
// Check results
- Assert.assertTrue( "testName: " + testName + "; Collected tuple size: " + tupleCollection.size() + "; Expected tuple size: " + expectedReceiveCount + "; data: \n" + tupleCollection,
+ Assert.assertTrue( "testName: " + testName + "; Collected tuple size: " + tupleCollection.size() + "; Expected tuple size: " + expectedReceiveCount + "; data: \n" + tupleCollection,
expectedReceiveCount == tupleCollection.size());
-
+
logger.info("End of test case: {}", testName);
}
-
+
private void setupHasFailureTest(KafkaSinglePortInputOperator operator, DAG dag)
{
operator.setHoldingBufferSize(5000);
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaTestPartitioner.java
----------------------------------------------------------------------
diff --git a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaTestPartitioner.java b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaTestPartitioner.java
index e6256f1..21f8977 100644
--- a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaTestPartitioner.java
+++ b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaTestPartitioner.java
@@ -34,7 +34,7 @@ import kafka.utils.VerifiableProperties;
public class KafkaTestPartitioner implements Partitioner
{
public KafkaTestPartitioner(VerifiableProperties props) {
-
+
}
public KafkaTestPartitioner() {
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaTestProducer.java
----------------------------------------------------------------------
diff --git a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaTestProducer.java b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaTestProducer.java
index 2f24a8a..ca6cc98 100644
--- a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaTestProducer.java
+++ b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaTestProducer.java
@@ -106,7 +106,7 @@ public class KafkaTestProducer implements Runnable
}
private transient List<Future<RecordMetadata>> sendTasks = Lists.newArrayList();
-
+
private void generateMessages()
{
// Create dummy message
@@ -140,12 +140,12 @@ public class KafkaTestProducer implements Runnable
sendTasks.add(producer.send(new ProducerRecord<>(topic, "", msg)));
}
}
-
+
producer.flush();
if (producer1!=null) {
producer1.flush();
}
-
+
try {
for (Future<RecordMetadata> task : sendTasks) {
task.get(30, TimeUnit.SECONDS);
@@ -153,7 +153,7 @@ public class KafkaTestProducer implements Runnable
} catch (Exception e) {
throw new RuntimeException(e);
}
-
+
close();
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/appdata/gpo/SerdeMapPrimitive.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/appdata/gpo/SerdeMapPrimitive.java b/library/src/main/java/com/datatorrent/lib/appdata/gpo/SerdeMapPrimitive.java
index cd61e20..8c8b83a 100644
--- a/library/src/main/java/com/datatorrent/lib/appdata/gpo/SerdeMapPrimitive.java
+++ b/library/src/main/java/com/datatorrent/lib/appdata/gpo/SerdeMapPrimitive.java
@@ -74,7 +74,7 @@ public class SerdeMapPrimitive implements Serde
GPOType gpoType = GPOType.GPO_TYPE_ARRAY[type.ordinal()];
bytes.add(gpoType.serialize(object));
}
-
+
@Override
public synchronized Object deserializeObject(byte[] objectBytes, MutableInt offset)
{
@@ -87,7 +87,7 @@ public class SerdeMapPrimitive implements Serde
int typeOrdinal = GPOUtils.deserializeInt(objectBytes, offset);
GPOType gpoType = GPOType.GPO_TYPE_ARRAY[typeOrdinal];
Object key = gpoType.deserialize(objectBytes, offset);
-
+
typeOrdinal = GPOUtils.deserializeInt(objectBytes, offset);
gpoType = GPOType.GPO_TYPE_ARRAY[typeOrdinal];
Object value = gpoType.deserialize(objectBytes, offset);
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/appdata/query/WindowBoundedService.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/appdata/query/WindowBoundedService.java b/library/src/main/java/com/datatorrent/lib/appdata/query/WindowBoundedService.java
index 83e8634..4d631c3 100644
--- a/library/src/main/java/com/datatorrent/lib/appdata/query/WindowBoundedService.java
+++ b/library/src/main/java/com/datatorrent/lib/appdata/query/WindowBoundedService.java
@@ -104,7 +104,7 @@ public class WindowBoundedService implements Component<OperatorContext>
mutex.release();
executorThread.shutdown();
-
+
try {
executorThread.awaitTermination(10000L + executeIntervalMillis, TimeUnit.MILLISECONDS);
} catch (InterruptedException ex) {
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/appdata/schemas/DimensionalConfigurationSchema.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/appdata/schemas/DimensionalConfigurationSchema.java b/library/src/main/java/com/datatorrent/lib/appdata/schemas/DimensionalConfigurationSchema.java
index 2333dbb..59625f9 100644
--- a/library/src/main/java/com/datatorrent/lib/appdata/schemas/DimensionalConfigurationSchema.java
+++ b/library/src/main/java/com/datatorrent/lib/appdata/schemas/DimensionalConfigurationSchema.java
@@ -2201,7 +2201,7 @@ public class DimensionalConfigurationSchema
{
return getDimensionsDescriptorIDToIncrementalAggregatorIDs();
}
-
+
public List<IntArrayList> getDimensionsDescriptorIDToCompositeAggregatorIDs()
{
return dimensionsDescriptorIDToCompositeAggregatorIDs;
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/appdata/schemas/DimensionalSchema.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/appdata/schemas/DimensionalSchema.java b/library/src/main/java/com/datatorrent/lib/appdata/schemas/DimensionalSchema.java
index 6138971..4fef2df 100644
--- a/library/src/main/java/com/datatorrent/lib/appdata/schemas/DimensionalSchema.java
+++ b/library/src/main/java/com/datatorrent/lib/appdata/schemas/DimensionalSchema.java
@@ -100,7 +100,7 @@ public class DimensionalSchema implements Schema
new Fields(Sets.newHashSet(FIELD_TIME_FROM, FIELD_TIME_TO)));
public static final String FIELD_RESPONSE_DELAY_MILLS = "responseDelayMillis";
-
+
/**
* The from value for the schema. Null if there is no from value.
*/
@@ -164,7 +164,7 @@ public class DimensionalSchema implements Schema
private int schemaID = Schema.DEFAULT_SCHEMA_ID;
protected long responseDelayMillis;
-
+
/**
* Constructor for serialization
*/
@@ -249,7 +249,7 @@ public class DimensionalSchema implements Schema
long responseDelayMillis)
{
this(schemaStub,
- configurationSchema,
+ configurationSchema,
responseDelayMillis);
this.schemaID = schemaID;
}
@@ -391,7 +391,7 @@ public class DimensionalSchema implements Schema
schema.put(SnapshotSchema.FIELD_SCHEMA_TYPE, DimensionalSchema.SCHEMA_TYPE);
schema.put(SnapshotSchema.FIELD_SCHEMA_VERSION, DimensionalSchema.SCHEMA_VERSION);
-
+
//responseDelayMillis
if (responseDelayMillis > 0) {
schema.put(FIELD_RESPONSE_DELAY_MILLS, responseDelayMillis);
@@ -459,10 +459,10 @@ public class DimensionalSchema implements Schema
for (int combinationID = 0;
combinationID < configurationSchema.getDimensionsDescriptorIDToKeys().size();
combinationID++) {
-
+
//TODO: the auto-generated combination for computation of composite aggregator will be added.
//should remove it.
-
+
Fields fields = configurationSchema.getDimensionsDescriptorIDToKeys().get(combinationID);
Map<String, Set<String>> fieldToAggregatorAdditionalValues =
configurationSchema.getDimensionsDescriptorIDToFieldToAggregatorAdditionalValues().get(combinationID);
@@ -515,7 +515,7 @@ public class DimensionalSchema implements Schema
combination.put(DimensionalConfigurationSchema.FIELD_DIMENSIONS_ADDITIONAL_VALUES, additionalValueArray);
}
-
+
dimensions.put(combination);
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/appdata/schemas/Schema.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/appdata/schemas/Schema.java b/library/src/main/java/com/datatorrent/lib/appdata/schemas/Schema.java
index 8260c81..4460b51 100644
--- a/library/src/main/java/com/datatorrent/lib/appdata/schemas/Schema.java
+++ b/library/src/main/java/com/datatorrent/lib/appdata/schemas/Schema.java
@@ -34,7 +34,7 @@ public interface Schema
public static final String FIELD_SCHEMA_KEYS = "schemaKeys";
public static final String FIELD_SCHEMA = "schema";
public static final String FIELD_SCHEMA_TAGS = "tags";
-
+
/**
* The id of the schema. This is relevant for operators which support serving multiple schemas,
* in which each schema will need a unique id.
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/appdata/schemas/SnapshotSchema.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/appdata/schemas/SnapshotSchema.java b/library/src/main/java/com/datatorrent/lib/appdata/schemas/SnapshotSchema.java
index 5010580..b1e6d36 100644
--- a/library/src/main/java/com/datatorrent/lib/appdata/schemas/SnapshotSchema.java
+++ b/library/src/main/java/com/datatorrent/lib/appdata/schemas/SnapshotSchema.java
@@ -245,7 +245,7 @@ public class SnapshotSchema implements Schema
schemaJSON = schema.toString();
}
-
+
/**
* This is a helper method which sets the JSON that represents this schema.
* @param schemaJSON The JSON that represents this schema.
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java b/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java
index 0b03e79..19e142b 100644
--- a/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java
+++ b/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java
@@ -107,9 +107,9 @@ public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper
* The queryExecutor execute the query and return the result.
*/
protected QueryExecutor<Query, Void, MutableLong, Result> queryExecutor;
-
+
private Set<String> tags;
-
+
@AppData.QueryPort
@InputPortFieldAnnotation(optional = true)
public final transient DefaultInputPort<String> query = new DefaultInputPort<String>()
@@ -120,7 +120,7 @@ public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper
processQuery(queryJSON);
}
};
-
+
/**
* process the query send.
* provide this method to give sub class a chance to override.
@@ -169,7 +169,7 @@ public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper
currentData.add(gpoRow);
}
}
-
+
/**
* Create operator.
*/
@@ -199,11 +199,11 @@ public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper
public void setup(OperatorContext context)
{
setupSchema();
-
+
schemaRegistry = new SchemaRegistrySingle(schema);
//Setup for query processing
setupQueryProcessor();
-
+
queryDeserializerFactory = new MessageDeserializerFactory(SchemaQuery.class,
DataQuerySnapshot.class);
queryDeserializerFactory.setContext(DataQuerySnapshot.class, schemaRegistry);
@@ -228,7 +228,7 @@ public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper
protected void setupQueryProcessor()
{
- queryProcessor = QueryManagerSynchronous.newInstance(queryExecutor == null ? new SnapshotComputer() : queryExecutor,
+ queryProcessor = QueryManagerSynchronous.newInstance(queryExecutor == null ? new SnapshotComputer() : queryExecutor,
new AppDataWindowEndQueueManager<Query, Void>());
}
@@ -378,6 +378,6 @@ public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper
{
this.tags = tags;
}
-
-
+
+
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/bandwidth/BandwidthPartitioner.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/bandwidth/BandwidthPartitioner.java b/library/src/main/java/com/datatorrent/lib/bandwidth/BandwidthPartitioner.java
index 2b503ed..9b9eb8d 100644
--- a/library/src/main/java/com/datatorrent/lib/bandwidth/BandwidthPartitioner.java
+++ b/library/src/main/java/com/datatorrent/lib/bandwidth/BandwidthPartitioner.java
@@ -42,7 +42,7 @@ public class BandwidthPartitioner<T extends BandwidthLimitingOperator> extends S
/**
* This constructor is used to create the partitioner from a property.
- *
+ *
* @param value A string which is an integer of the number of partitions to create
*/
public BandwidthPartitioner(String value)
@@ -52,7 +52,7 @@ public class BandwidthPartitioner<T extends BandwidthLimitingOperator> extends S
/**
* This creates a partitioner which creates partitonCount partitions.
- *
+ *
* @param partitionCount The number of partitions to create.
*/
public BandwidthPartitioner(int partitionCount)
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/codec/package-info.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/codec/package-info.java b/library/src/main/java/com/datatorrent/lib/codec/package-info.java
index ded8689..d876e3f 100644
--- a/library/src/main/java/com/datatorrent/lib/codec/package-info.java
+++ b/library/src/main/java/com/datatorrent/lib/codec/package-info.java
@@ -17,7 +17,7 @@
* under the License.
*/
/**
- * Shared codec implementations.
+ * Shared codec implementations.
*/
@org.apache.hadoop.classification.InterfaceStability.Evolving
package com.datatorrent.lib.codec;
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/converter/Converter.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/converter/Converter.java b/library/src/main/java/com/datatorrent/lib/converter/Converter.java
index ef999e4..3799cd2 100644
--- a/library/src/main/java/com/datatorrent/lib/converter/Converter.java
+++ b/library/src/main/java/com/datatorrent/lib/converter/Converter.java
@@ -24,7 +24,7 @@ import org.apache.hadoop.classification.InterfaceStability;
* Operators that are converting tuples from one format to another must
* implement this interface. Eg. Parsers or formatters , that parse data of
* certain format and convert them to another format.
- *
+ *
* @param <INPUT>
* @param <OUTPUT>
* @since 3.2.0
@@ -35,7 +35,7 @@ public interface Converter<INPUT, OUTPUT>
/**
* Provide the implementation for converting tuples from one format to the
* other
- *
+ *
* @param tuple tuple of certain format
* @return OUTPUT tuple of converted format
*/
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/db/KeyValueStore.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/KeyValueStore.java b/library/src/main/java/com/datatorrent/lib/db/KeyValueStore.java
index 4211d3d..76759a4 100644
--- a/library/src/main/java/com/datatorrent/lib/db/KeyValueStore.java
+++ b/library/src/main/java/com/datatorrent/lib/db/KeyValueStore.java
@@ -62,7 +62,7 @@ public interface KeyValueStore extends Connectable
/**
* Removes the key and the value given the key
- *
+ *
* @param key
*/
public void remove(Object key);
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPOJOOutputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPOJOOutputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPOJOOutputOperator.java
index 90111d8..38d44a0 100644
--- a/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPOJOOutputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPOJOOutputOperator.java
@@ -94,7 +94,7 @@ public abstract class AbstractJdbcPOJOOutputOperator extends AbstractJdbcTransac
super();
columnFieldGetters = Lists.newArrayList();
}
-
+
protected static class ActiveFieldInfo
{
final FieldInfo fieldInfo;
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
index f9fb714..6bd5121 100644
--- a/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
@@ -79,11 +79,11 @@ import static org.jooq.impl.DSL.field;
* partitions for fetching the existing data in the table. And an additional
* single partition for polling additive data. Assumption is that there is an
* ordered unique column using which range queries can be formed<br>
- *
+ *
* Only newly added data will be fetched by the polling jdbc partition, also
* assumption is rows won't be added or deleted in middle during scan.
- *
- *
+ *
+ *
* @displayName Jdbc Polling Input Operator
* @category Input
* @tags database, sql, jdbc, partitionable, idepotent, pollable
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPollInputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPollInputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPollInputOperator.java
index d139379..9a76103 100644
--- a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPollInputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPollInputOperator.java
@@ -33,7 +33,7 @@ import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
/**
* A concrete implementation for {@link AbstractJdbcPollInputOperator} to
* consume data from jdbc store and emit comma separated values <br>
- *
+ *
* @displayName Jdbc Polling Input Operator
* @category Input
* @tags database, sql, jdbc
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/fileaccess/TFileImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/fileaccess/TFileImpl.java b/library/src/main/java/com/datatorrent/lib/fileaccess/TFileImpl.java
index 7dfe4e9..f11f2bc 100644
--- a/library/src/main/java/com/datatorrent/lib/fileaccess/TFileImpl.java
+++ b/library/src/main/java/com/datatorrent/lib/fileaccess/TFileImpl.java
@@ -32,8 +32,8 @@ import org.apache.hadoop.io.file.tfile.TFile.Writer;
/**
* A TFile wrapper with FileAccess API
* <ul>
- * <li>{@link TFileImpl.DefaultTFileImpl} return default TFile {@link Reader} and {@link Writer} for IO operations</li>
- * <li>{@link TFileImpl.DTFileImpl} return DTFile {@link org.apache.hadoop.io.file.tfile.DTFile.Reader}(which is faster than default TFile reader) and {@link Writer} for IO operations</li>
+ * <li>{@link TFileImpl.DefaultTFileImpl} return default TFile {@link Reader} and {@link Writer} for IO operations</li>
+ * <li>{@link TFileImpl.DTFileImpl} return DTFile {@link org.apache.hadoop.io.file.tfile.DTFile.Reader}(which is faster than default TFile reader) and {@link Writer} for IO operations</li>
* </ul>
*
* @since 2.0.0
@@ -44,16 +44,16 @@ public abstract class TFileImpl extends FileAccessFSImpl
private int minBlockSize = 64 * 1024;
private String compressName = TFile.COMPRESSION_NONE;
-
+
private String comparator = "memcmp";
-
+
private int chunkSize = 1024 * 1024;
-
+
private int inputBufferSize = 256 * 1024;
-
+
private int outputBufferSize = 256 * 1024;
-
+
private void setupConfig(Configuration conf)
{
conf.set("tfile.io.chunk.size", String.valueOf(chunkSize));
@@ -69,7 +69,7 @@ public abstract class TFileImpl extends FileAccessFSImpl
setupConfig(fs.getConf());
return new TFileWriter(fsdos, minBlockSize, compressName, comparator, fs.getConf());
}
-
+
public int getMinBlockSize()
{
return minBlockSize;
@@ -140,13 +140,13 @@ public abstract class TFileImpl extends FileAccessFSImpl
{
this.outputBufferSize = outputBufferSize;
}
-
+
/**
* Return {@link TFile} {@link Reader}
*/
public static class DefaultTFileImpl extends TFileImpl
{
-
+
@Override
public FileReader getReader(long bucketKey, String fileName) throws IOException
{
@@ -155,15 +155,15 @@ public abstract class TFileImpl extends FileAccessFSImpl
super.setupConfig(fs.getConf());
return new TFileReader(fsdis, fileLength, fs.getConf());
}
-
+
}
-
+
/**
* Return {@link DTFile} {@link org.apache.hadoop.io.file.tfile.DTFile.Reader}
*/
public static class DTFileImpl extends TFileImpl
{
-
+
@Override
public FileReader getReader(long bucketKey, String fileName) throws IOException
{
@@ -172,7 +172,7 @@ public abstract class TFileImpl extends FileAccessFSImpl
super.setupConfig(fs.getConf());
return new DTFileReader(fsdis, fileLength, fs.getConf());
}
-
+
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/fileaccess/TFileWriter.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/fileaccess/TFileWriter.java b/library/src/main/java/com/datatorrent/lib/fileaccess/TFileWriter.java
index 7e9d544..da724d4 100644
--- a/library/src/main/java/com/datatorrent/lib/fileaccess/TFileWriter.java
+++ b/library/src/main/java/com/datatorrent/lib/fileaccess/TFileWriter.java
@@ -34,15 +34,15 @@ import org.apache.hadoop.io.file.tfile.TFile.Writer;
public final class TFileWriter implements FileAccess.FileWriter
{
private Writer writer;
-
+
private FSDataOutputStream fsdos;
-
+
public TFileWriter(FSDataOutputStream stream, int minBlockSize, String compressName,
String comparator, Configuration conf) throws IOException
{
this.fsdos = stream;
writer = new Writer(stream, minBlockSize, compressName, comparator, conf);
-
+
}
@Override
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/filter/FilterOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/filter/FilterOperator.java b/library/src/main/java/com/datatorrent/lib/filter/FilterOperator.java
index 2a54e0f..6fccf1e 100644
--- a/library/src/main/java/com/datatorrent/lib/filter/FilterOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/filter/FilterOperator.java
@@ -56,7 +56,7 @@ import com.datatorrent.lib.util.PojoUtils;
* - truePort emits POJOs meeting the given condition
* - falsePort emits POJOs not meeting the given condition
* - error port emits any error situation while evaluating expression
- *
+ *
*
* @since 3.5.0
*/
@@ -234,6 +234,6 @@ public class FilterOperator extends BaseOperator implements Operator.ActivationL
{
return expressionFunctions;
}
-
+
private static final Logger logger = LoggerFactory.getLogger(FilterOperator.class);
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/formatter/Formatter.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/formatter/Formatter.java b/library/src/main/java/com/datatorrent/lib/formatter/Formatter.java
index db8dbc4..ef3c304 100644
--- a/library/src/main/java/com/datatorrent/lib/formatter/Formatter.java
+++ b/library/src/main/java/com/datatorrent/lib/formatter/Formatter.java
@@ -40,7 +40,7 @@ import com.datatorrent.lib.converter.Converter;
* <b>err</b>: emits <Object> error port that emits input tuple that could
* not be converted<br>
* <br>
- *
+ *
* @displayName Parser
* @tags parser converter
* @param <OUTPUT>
@@ -99,7 +99,7 @@ public abstract class Formatter<OUTPUT> extends BaseOperator implements Converte
/**
* Get the class that needs to be formatted
- *
+ *
* @return Class<?>
*/
public Class<?> getClazz()
@@ -109,7 +109,7 @@ public abstract class Formatter<OUTPUT> extends BaseOperator implements Converte
/**
* Set the class of tuple that needs to be formatted
- *
+ *
* @param clazz
*/
public void setClazz(Class<?> clazz)
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/formatter/JsonFormatter.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/formatter/JsonFormatter.java b/library/src/main/java/com/datatorrent/lib/formatter/JsonFormatter.java
index 840b550..a784f89 100644
--- a/library/src/main/java/com/datatorrent/lib/formatter/JsonFormatter.java
+++ b/library/src/main/java/com/datatorrent/lib/formatter/JsonFormatter.java
@@ -29,7 +29,7 @@ import com.datatorrent.api.Context.OperatorContext;
/**
* Operator that converts POJO to JSON string <br>
- *
+ *
* @displayName JsonFormatter
* @category Formatter
* @tags pojo json formatter
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/formatter/XmlFormatter.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/formatter/XmlFormatter.java b/library/src/main/java/com/datatorrent/lib/formatter/XmlFormatter.java
index 21a7b6a..78dc344 100644
--- a/library/src/main/java/com/datatorrent/lib/formatter/XmlFormatter.java
+++ b/library/src/main/java/com/datatorrent/lib/formatter/XmlFormatter.java
@@ -88,7 +88,7 @@ public class XmlFormatter extends Formatter<String>
* Gets the alias This is an optional step. Without it XStream would work
* fine, but the XML element names would contain the fully qualified name of
* each class (including package) which would bulk up the XML a bit.
- *
+ *
* @return alias.
*/
public String getAlias()
@@ -100,7 +100,7 @@ public class XmlFormatter extends Formatter<String>
* Sets the alias This is an optional step. Without it XStream would work
* fine, but the XML element names would contain the fully qualified name of
* each class (including package) which would bulk up the XML a bit.
- *
+ *
* @param alias
* .
*/
@@ -112,7 +112,7 @@ public class XmlFormatter extends Formatter<String>
/**
* Gets the date format e.g dd/mm/yyyy - this will be how a date would be
* formatted
- *
+ *
* @return dateFormat.
*/
public String getDateFormat()
@@ -123,7 +123,7 @@ public class XmlFormatter extends Formatter<String>
/**
* Sets the date format e.g dd/mm/yyyy - this will be how a date would be
* formatted
- *
+ *
* @param dateFormat
* .
*/
@@ -134,7 +134,7 @@ public class XmlFormatter extends Formatter<String>
/**
* Returns true if pretty print is enabled.
- *
+ *
* @return prettyPrint
*/
public boolean isPrettyPrint()
@@ -144,7 +144,7 @@ public class XmlFormatter extends Formatter<String>
/**
* Sets pretty print option.
- *
+ *
* @param prettyPrint
*/
public void setPrettyPrint(boolean prettyPrint)
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/io/AbstractFTPInputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/AbstractFTPInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/AbstractFTPInputOperator.java
index 1401100..16d220c 100644
--- a/library/src/main/java/com/datatorrent/lib/io/AbstractFTPInputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/AbstractFTPInputOperator.java
@@ -174,7 +174,7 @@ public abstract class AbstractFTPInputOperator<T> extends AbstractFileInputOpera
/**
* An {@link AbstractFTPInputOperator} that splits file into lines and emits them.
- *
+ *
* @displayName FTP String Input
* @category Input
* @tags ftp
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/io/block/BlockWriter.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/block/BlockWriter.java b/library/src/main/java/com/datatorrent/lib/io/block/BlockWriter.java
index 9e18e1b..64c066b 100644
--- a/library/src/main/java/com/datatorrent/lib/io/block/BlockWriter.java
+++ b/library/src/main/java/com/datatorrent/lib/io/block/BlockWriter.java
@@ -59,7 +59,7 @@ public class BlockWriter extends AbstractFileOutputOperator<AbstractBlockReader.
* Directory under application directory where blocks gets stored
*/
private String blocksDirectory = DEFAULT_BLOCKS_DIR;
-
+
/**
* List of FileBlockMetadata received in the current window.
*/
@@ -206,7 +206,7 @@ public class BlockWriter extends AbstractFileOutputOperator<AbstractBlockReader.
}
}
}
-
+
/**
* Directory under application directory where blocks gets stored
* @return blocks directory
@@ -215,7 +215,7 @@ public class BlockWriter extends AbstractFileOutputOperator<AbstractBlockReader.
{
return blocksDirectory;
}
-
+
/**
* Directory under application directory where blocks gets stored
* @param blocksDirectory blocks directory
@@ -230,7 +230,7 @@ public class BlockWriter extends AbstractFileOutputOperator<AbstractBlockReader.
{
}
-
+
private static final Logger LOG = LoggerFactory.getLogger(BlockWriter.class);
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/io/block/FSSliceReader.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/block/FSSliceReader.java b/library/src/main/java/com/datatorrent/lib/io/block/FSSliceReader.java
index ad55358..60fd93c 100644
--- a/library/src/main/java/com/datatorrent/lib/io/block/FSSliceReader.java
+++ b/library/src/main/java/com/datatorrent/lib/io/block/FSSliceReader.java
@@ -26,7 +26,7 @@ import com.datatorrent.netlet.util.Slice;
*
* @category Input
* @tags fs
- *
+ *
* @since 2.1.0
*/
@StatsListener.DataQueueSize
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileSplitter.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileSplitter.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileSplitter.java
index 7e6bd2f..38c8e96 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileSplitter.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileSplitter.java
@@ -389,7 +389,7 @@ public abstract class AbstractFileSplitter extends BaseOperator
this.filePath = filePath;
discoverTime = System.currentTimeMillis();
}
-
+
protected FileMetadata(FileMetadata fileMetadata)
{
this();
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/io/fs/AbstractSingleFileOutputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractSingleFileOutputOperator.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractSingleFileOutputOperator.java
index 3b60d4a..99a8fb6 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractSingleFileOutputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractSingleFileOutputOperator.java
@@ -44,8 +44,8 @@ public abstract class AbstractSingleFileOutputOperator<INPUT> extends AbstractFi
protected String outputFileName;
/**
- * partitionedFileName string format specifier
- e.g. fileName_physicalPartionId -> %s_%d
+ * partitionedFileName string format specifier
+ e.g. fileName_physicalPartionId -> %s_%d
*/
private String partitionedFileNameformat = "%s_%d";
@@ -105,17 +105,17 @@ public abstract class AbstractSingleFileOutputOperator<INPUT> extends AbstractFi
{
return partitionedFileNameformat;
}
-
+
/**
* @param partitionedFileNameformat
* string format specifier for the partitioned file name. It should have one %s and one %d.
- * e.g. fileName_physicalPartionId -> %s_%d
+ * e.g. fileName_physicalPartionId -> %s_%d
*/
public void setPartitionedFileNameformat(String partitionedFileNameformat)
{
this.partitionedFileNameformat = partitionedFileNameformat;
}
-
+
/**
* @return
* Derived name for file based on physicalPartitionId
@@ -124,5 +124,5 @@ public abstract class AbstractSingleFileOutputOperator<INPUT> extends AbstractFi
{
return partitionedFileName;
}
-
+
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/io/fs/FileMerger.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/FileMerger.java b/library/src/main/java/com/datatorrent/lib/io/fs/FileMerger.java
index 04aa8cf..db7d7c5 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/FileMerger.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/FileMerger.java
@@ -118,7 +118,7 @@ public class FileMerger extends FileStitcher<OutputFileMetadata>
OutputStream outputStream = outputFS.create(partFilePath);
return outputStream;
}
-
+
/**
* Flag to control if existing file with same name should be overwritten
* @return Flag to control if existing file with same name should be overwritten
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/io/fs/FileStitcher.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/FileStitcher.java b/library/src/main/java/com/datatorrent/lib/io/fs/FileStitcher.java
index 5f5c717..c4ad9d3 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/FileStitcher.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/FileStitcher.java
@@ -48,7 +48,7 @@ import com.datatorrent.lib.io.fs.Synchronizer.StitchedFileMetaData;
* This is generic File Stitcher which can be used to merge data from one or
* more files into single stitched file. StitchedFileMetaData defines
* constituents of the stitched file.
- *
+ *
* This class uses Reconciler to
*
* @since 3.4.0
@@ -75,7 +75,7 @@ public class FileStitcher<T extends StitchedFileMetaData> extends AbstractReconc
* Path for blocks directory
*/
protected transient String blocksDirectoryPath;
-
+
/**
* Directory under application directory where blocks gets stored
*/
@@ -133,8 +133,8 @@ public class FileStitcher<T extends StitchedFileMetaData> extends AbstractReconc
super.setup(context); // Calling it at the end as the reconciler thread uses resources allocated above.
}
- /*
- * Calls super.endWindow() and sets counters
+ /*
+ * Calls super.endWindow() and sets counters
* @see com.datatorrent.api.BaseOperator#endWindow()
*/
@Override
@@ -146,7 +146,7 @@ public class FileStitcher<T extends StitchedFileMetaData> extends AbstractReconc
stitchedFileMetaData = doneTuples.peek();
// If a tuple is present in doneTuples, it has to be also present in successful/failed/skipped
// as processCommittedData adds tuple in successful/failed/skipped
- // and then reconciler thread add that in doneTuples
+ // and then reconciler thread add that in doneTuples
if (successfulFiles.contains(stitchedFileMetaData)) {
successfulFiles.remove(stitchedFileMetaData);
LOG.debug("File copy successful: {}", stitchedFileMetaData.getStitchedFileRelativePath());
@@ -167,7 +167,7 @@ public class FileStitcher<T extends StitchedFileMetaData> extends AbstractReconc
}
/**
- *
+ *
* @return Application FileSystem instance
* @throws IOException
*/
@@ -177,7 +177,7 @@ public class FileStitcher<T extends StitchedFileMetaData> extends AbstractReconc
}
/**
- *
+ *
* @return Destination FileSystem instance
* @throws IOException
*/
@@ -240,7 +240,7 @@ public class FileStitcher<T extends StitchedFileMetaData> extends AbstractReconc
/**
* Read data from block files and write to output file. Information about
* which block files should be read is specified in outFileMetadata
- *
+ *
* @param stitchedFileMetaData
* @throws IOException
*/
@@ -287,7 +287,7 @@ public class FileStitcher<T extends StitchedFileMetaData> extends AbstractReconc
/**
* Writing all Stitch blocks to temporary file
- *
+ *
* @param stitchedFileMetaData
* @throws IOException
* @throws BlockNotFoundException
@@ -312,7 +312,7 @@ public class FileStitcher<T extends StitchedFileMetaData> extends AbstractReconc
/**
* Moving temp output file to final file
- *
+ *
* @param stitchedFileMetaData
* @throws IOException
*/
@@ -324,7 +324,7 @@ public class FileStitcher<T extends StitchedFileMetaData> extends AbstractReconc
/**
* Moving temp output file to final file
- *
+ *
* @param tempOutFilePath
* Temporary output file
* @param destination
@@ -351,7 +351,7 @@ public class FileStitcher<T extends StitchedFileMetaData> extends AbstractReconc
throw new RuntimeException("Unable to move file from " + src + " to " + dst);
}
}
-
+
/**
* Directory under application directory where blocks gets stored
* @return blocks directory
@@ -360,7 +360,7 @@ public class FileStitcher<T extends StitchedFileMetaData> extends AbstractReconc
{
return blocksDirectory;
}
-
+
/**
* Directory under application directory where blocks gets stored
* @param blocksDirectory blocks directory
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/io/fs/FilterStreamCodec.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/FilterStreamCodec.java b/library/src/main/java/com/datatorrent/lib/io/fs/FilterStreamCodec.java
index 1029dff..baf2297 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/FilterStreamCodec.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/FilterStreamCodec.java
@@ -41,7 +41,7 @@ public class FilterStreamCodec
{
filterStream = new GZIPOutputStream(outputStream);
}
-
+
@Override
public void finalizeContext() throws IOException
{
@@ -80,7 +80,7 @@ public class FilterStreamCodec
}
/**
- * This provider is useful when writing to a single output stream so that the same cipher can be reused
+ * This provider is useful when writing to a single output stream so that the same cipher can be reused
*/
public static class CipherSimpleStreamProvider implements FilterStreamProvider<CipherOutputStream, OutputStream>
{
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/io/fs/FilterStreamContext.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/FilterStreamContext.java b/library/src/main/java/com/datatorrent/lib/io/fs/FilterStreamContext.java
index 35530a3..dd0393a 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/FilterStreamContext.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/FilterStreamContext.java
@@ -61,7 +61,7 @@ public interface FilterStreamContext<F extends FilterOutputStream>
}
}
-
+
public static class SimpleFilterStreamContext<F extends FilterOutputStream> extends BaseFilterStreamContext<F>
{
public SimpleFilterStreamContext(F filterStream)
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/io/fs/FilterStreamProvider.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/FilterStreamProvider.java b/library/src/main/java/com/datatorrent/lib/io/fs/FilterStreamProvider.java
index 75e6e5f..6debaec 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/FilterStreamProvider.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/FilterStreamProvider.java
@@ -39,9 +39,9 @@ import com.google.common.collect.Maps;
public interface FilterStreamProvider<F extends FilterOutputStream, S extends OutputStream>
{
public FilterStreamContext<F> getFilterStreamContext(S outputStream) throws IOException;
-
+
public void reclaimFilterStreamContext(FilterStreamContext<F> filterStreamContext);
-
+
abstract class SimpleFilterReusableStreamProvider<F extends FilterOutputStream, S extends OutputStream> implements FilterStreamProvider<F, S>
{
@@ -67,7 +67,7 @@ public interface FilterStreamProvider<F extends FilterOutputStream, S extends Ou
reusableContexts.put(outputStream, filterStreamContext);
}
}
-
+
protected abstract FilterStreamContext<F> createFilterStreamContext(OutputStream outputStream) throws IOException;
}
@@ -78,7 +78,7 @@ public interface FilterStreamProvider<F extends FilterOutputStream, S extends Ou
public static class FilterChainStreamProvider<F extends FilterOutputStream, S extends OutputStream> implements FilterStreamProvider<F, S>
{
private List<FilterStreamProvider<?,?>> streamProviders = new ArrayList<FilterStreamProvider<?, ?>>();
-
+
public Collection<FilterStreamProvider<?,?>> getStreamProviders()
{
return Collections.unmodifiableList(streamProviders);
@@ -88,7 +88,7 @@ public interface FilterStreamProvider<F extends FilterOutputStream, S extends Ou
{
streamProviders.add(streamProvider);
}
-
+
@Override
public FilterStreamContext<F> getFilterStreamContext(S outputStream) throws IOException
{
@@ -120,7 +120,7 @@ public interface FilterStreamProvider<F extends FilterOutputStream, S extends Ou
private class FilterChainStreamContext extends FilterStreamContext.BaseFilterStreamContext
implements FilterStreamContext
{
-
+
private List<FilterStreamContext<?>> streamContexts = new ArrayList<FilterStreamContext<?>>();
public void pushStreamContext(FilterStreamContext<?> streamContext)
@@ -128,7 +128,7 @@ public interface FilterStreamProvider<F extends FilterOutputStream, S extends Ou
streamContexts.add(0, streamContext);
filterStream = streamContext.getFilterStream();
}
-
+
public Collection<FilterStreamContext<?>> getStreamContexts()
{
return Collections.unmodifiableCollection(streamContexts);
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/io/fs/HDFSFileCopyModule.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/HDFSFileCopyModule.java b/library/src/main/java/com/datatorrent/lib/io/fs/HDFSFileCopyModule.java
index f4d1a38..5fbc580 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/HDFSFileCopyModule.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/HDFSFileCopyModule.java
@@ -37,9 +37,9 @@ import com.datatorrent.netlet.util.Slice;
* copy files from any file system to HDFS. This module supports parallel write
* to multiple blocks of the same file and then stitching those blocks in
* original sequence.
- *
+ *
* Essential operators are wrapped into single component using Module API.
- *
+ *
*
* @since 3.4.0
*/
@@ -108,7 +108,7 @@ public class HDFSFileCopyModule implements Module
/**
* Path of the output directory. Relative path of the files copied will be
* maintained w.r.t. source directory and output directory
- *
+ *
* @return output directory path
*/
public String getOutputDirectoryPath()
@@ -119,7 +119,7 @@ public class HDFSFileCopyModule implements Module
/**
* Path of the output directory. Relative path of the files copied will be
* maintained w.r.t. source directory and output directory
- *
+ *
* @param outputDirectoryPath
* output directory path
*/
@@ -130,7 +130,7 @@ public class HDFSFileCopyModule implements Module
/**
* Flag to control if existing file with same name should be overwritten
- *
+ *
* @return Flag to control if existing file with same name should be
* overwritten
*/
@@ -141,7 +141,7 @@ public class HDFSFileCopyModule implements Module
/**
* Flag to control if existing file with same name should be overwritten
- *
+ *
* @param overwriteOnConflict
* Flag to control if existing file with same name should be
* overwritten
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/io/fs/HDFSFileMerger.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/HDFSFileMerger.java b/library/src/main/java/com/datatorrent/lib/io/fs/HDFSFileMerger.java
index 6f72484..cd2bee3 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/HDFSFileMerger.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/HDFSFileMerger.java
@@ -102,7 +102,7 @@ public class HDFSFileMerger extends FileMerger
/**
* Fast merge using HDFS block concat
- *
+ *
* @param outputFileMetadata
* @throws IOException
*/
@@ -130,7 +130,7 @@ public class HDFSFileMerger extends FileMerger
/**
* Attempt for recovery if block concat is successful but temp file is not
* moved to final file
- *
+ *
* @param outputFileMetadata
* @throws IOException
*/
@@ -179,7 +179,7 @@ public class HDFSFileMerger extends FileMerger
/**
* Checks if fast merge is possible for given settings for blocks directory,
* application file system, block size
- *
+ *
* @param outputFileMetadata
* @throws IOException
* @throws BlockNotFoundException
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/io/fs/Synchronizer.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/Synchronizer.java b/library/src/main/java/com/datatorrent/lib/io/fs/Synchronizer.java
index 8632343..a325a2f 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/Synchronizer.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/Synchronizer.java
@@ -115,7 +115,7 @@ public class Synchronizer extends BaseOperator
/**
* Checks if all blocks for given file are received. Sends triggger when all
* blocks are received.
- *
+ *
* @param fileMetadata
* @param receivedBlocksMetadata
*/
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSSinglePortOutputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSSinglePortOutputOperator.java b/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSSinglePortOutputOperator.java
index efda6b0..f26ecf4 100644
--- a/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSSinglePortOutputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSSinglePortOutputOperator.java
@@ -55,7 +55,7 @@ public abstract class AbstractJMSSinglePortOutputOperator<T> extends AbstractJMS
{
@SuppressWarnings("unused")
private static final Logger logger = LoggerFactory.getLogger(AbstractJMSSinglePortOutputOperator.class);
-
+
/**
* Convert to and send message.
* @param tuple
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/io/jms/JMSBase.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/jms/JMSBase.java b/library/src/main/java/com/datatorrent/lib/io/jms/JMSBase.java
index 772464a..99c2eeb 100644
--- a/library/src/main/java/com/datatorrent/lib/io/jms/JMSBase.java
+++ b/library/src/main/java/com/datatorrent/lib/io/jms/JMSBase.java
@@ -139,7 +139,7 @@ public class JMSBase
{
this.connectionFactoryProperties = connectionFactoryProperties;
}
-
+
/**
* Get the fully qualified class-name of the connection factory that is used by this
* builder to instantiate the connection factory
@@ -150,7 +150,7 @@ public class JMSBase
{
return connectionFactoryClass;
}
-
+
/**
* Set the fully qualified class-name of the connection factory that is used by this
* builder to instantiate the connection factory
@@ -213,7 +213,7 @@ public class JMSBase
{
return destination;
}
-
+
/**
* gets the connection factory class-name used by the default connection factory builder
*
@@ -244,7 +244,7 @@ public class JMSBase
}
return (DefaultConnectionFactoryBuilder)connectionFactoryBuilder;
}
-
+
/**
* Sets the connection factory class-name used by the default connection factory builder
*
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/math/AbstractAggregateCalc.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/AbstractAggregateCalc.java b/library/src/main/java/com/datatorrent/lib/math/AbstractAggregateCalc.java
index 5f09a4b..e403979 100644
--- a/library/src/main/java/com/datatorrent/lib/math/AbstractAggregateCalc.java
+++ b/library/src/main/java/com/datatorrent/lib/math/AbstractAggregateCalc.java
@@ -42,7 +42,7 @@ import com.datatorrent.api.DefaultInputPort;
* <b>integerResult</b>: emits Integer<br>
* <b>longResult</b>: emits Long<br>
* <br>
- *
+ *
* @displayName Abstract Aggregate Calculator
* @category Math
* @tags aggregate, collection
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/math/AbstractXmlKeyValueCartesianProduct.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/AbstractXmlKeyValueCartesianProduct.java b/library/src/main/java/com/datatorrent/lib/math/AbstractXmlKeyValueCartesianProduct.java
index 91cc9ba..f7283e0 100644
--- a/library/src/main/java/com/datatorrent/lib/math/AbstractXmlKeyValueCartesianProduct.java
+++ b/library/src/main/java/com/datatorrent/lib/math/AbstractXmlKeyValueCartesianProduct.java
@@ -22,9 +22,9 @@ import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
/**
- * This operator extends the AbstractXmlCartesianProduct operator and implements the node value
+ * This operator extends the AbstractXmlCartesianProduct operator and implements the node value
* as a key value pair of node name and the node's text value.
- *
+ *
* @displayName Abstract XML Key Value Cartesian Product
* @category Math
* @tags cartesian product, xml, multiple products, key value
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/math/Division.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/Division.java b/library/src/main/java/com/datatorrent/lib/math/Division.java
index d05af18..f5a01aa 100644
--- a/library/src/main/java/com/datatorrent/lib/math/Division.java
+++ b/library/src/main/java/com/datatorrent/lib/math/Division.java
@@ -58,17 +58,17 @@ public class Division extends BaseOperator
* Array to store numerator inputs during window.
*/
private ArrayList<Number> numer = new ArrayList<Number>();
-
+
/**
* Array to store denominator input during window.
*/
private ArrayList<Number> denom = new ArrayList<Number>();
-
+
/**
* Number of pair processed in current window.
*/
private int index = 0;
-
+
/**
* Numerator input port.
*/
@@ -112,55 +112,55 @@ public class Division extends BaseOperator
}
}
};
-
+
/**
- * Long quotient output port.
+ * Long quotient output port.
*/
@OutputPortFieldAnnotation(optional = true)
public final transient DefaultOutputPort<Long> longQuotient = new DefaultOutputPort<Long>();
-
+
/**
- * Integer quotient output port.
+ * Integer quotient output port.
*/
@OutputPortFieldAnnotation(optional = true)
public final transient DefaultOutputPort<Integer> integerQuotient = new DefaultOutputPort<Integer>();
-
+
/**
- * Double quotient output port.
+ * Double quotient output port.
*/
@OutputPortFieldAnnotation(optional = true)
public final transient DefaultOutputPort<Double> doubleQuotient = new DefaultOutputPort<Double>();
/**
- * Float quotient output port.
+ * Float quotient output port.
*/
@OutputPortFieldAnnotation(optional = true)
public final transient DefaultOutputPort<Float> floatQuotient = new DefaultOutputPort<Float>();
-
+
/**
- * Long remainder output port.
+ * Long remainder output port.
*/
@OutputPortFieldAnnotation(optional = true)
public final transient DefaultOutputPort<Long> longRemainder = new DefaultOutputPort<Long>();
-
+
/**
- * Integer remainder output port.
+ * Integer remainder output port.
*/
@OutputPortFieldAnnotation(optional = true)
public final transient DefaultOutputPort<Integer> integerRemainder = new DefaultOutputPort<Integer>();
-
+
/**
- * Double remainder output port.
+ * Double remainder output port.
*/
@OutputPortFieldAnnotation(optional = true)
public final transient DefaultOutputPort<Double> doubleRemainder = new DefaultOutputPort<Double>();
-
+
/**
- * Float remainder output port.
+ * Float remainder output port.
*/
@OutputPortFieldAnnotation(optional = true)
public final transient DefaultOutputPort<Float> floatRemainder = new DefaultOutputPort<Float>();
-
+
/**
* Error data output port that emits a string.
*/
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/math/Margin.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/Margin.java b/library/src/main/java/com/datatorrent/lib/math/Margin.java
index 94e15d6..5d1872e 100644
--- a/library/src/main/java/com/datatorrent/lib/math/Margin.java
+++ b/library/src/main/java/com/datatorrent/lib/math/Margin.java
@@ -24,7 +24,7 @@ import com.datatorrent.api.annotation.OperatorAnnotation;
import com.datatorrent.lib.util.BaseNumberValueOperator;
/**
- * This operator sums the division of numerator and denominator value arriving at input ports.
+ * This operator sums the division of numerator and denominator value arriving at input ports.
* <p>
* <br>
* Margin Formula used by this operator: 1 - numerator/denominator.<br>
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/math/MarginMap.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/MarginMap.java b/library/src/main/java/com/datatorrent/lib/math/MarginMap.java
index 7ef1f81..d1fa33f 100644
--- a/library/src/main/java/com/datatorrent/lib/math/MarginMap.java
+++ b/library/src/main/java/com/datatorrent/lib/math/MarginMap.java
@@ -54,7 +54,7 @@ public class MarginMap<K, V extends Number> extends BaseNumberKeyValueOperator<K
{
/**
* Numerator input port that takes a map.
- */
+ */
public final transient DefaultInputPort<Map<K, V>> numerator = new DefaultInputPort<Map<K, V>>()
{
/**
@@ -66,7 +66,7 @@ public class MarginMap<K, V extends Number> extends BaseNumberKeyValueOperator<K
addTuple(tuple, numerators);
}
};
-
+
/**
* Denominator input port that takes a map.
*/
@@ -101,7 +101,7 @@ public class MarginMap<K, V extends Number> extends BaseNumberKeyValueOperator<K
val.add(e.getValue().doubleValue());
}
}
-
+
/*
* Output margin port that emits hashmap.
*/
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/math/Min.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/Min.java b/library/src/main/java/com/datatorrent/lib/math/Min.java
index 4b3fa23..a862f2e 100644
--- a/library/src/main/java/com/datatorrent/lib/math/Min.java
+++ b/library/src/main/java/com/datatorrent/lib/math/Min.java
@@ -45,10 +45,10 @@ public class Min<V extends Number> extends BaseNumberValueOperator<V> implements
* Computed low value.
*/
protected V low;
-
+
// transient field
protected boolean flag = false;
-
+
/**
* Input port that takes a number and compares to min and stores the new min.
*/
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/math/MultiplyByConstant.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/MultiplyByConstant.java b/library/src/main/java/com/datatorrent/lib/math/MultiplyByConstant.java
index dd56f7f..5396768 100644
--- a/library/src/main/java/com/datatorrent/lib/math/MultiplyByConstant.java
+++ b/library/src/main/java/com/datatorrent/lib/math/MultiplyByConstant.java
@@ -26,7 +26,7 @@ import com.datatorrent.common.util.BaseOperator;
/**
* Multiplies input tuple (Number) by the value of property "multiplier" and emits the result on respective ports.
* <p>
- * This operator emits the result as Long on port "longProduct", as Integer on port "integerProduct", as Double on port "doubleProduct", and as Float on port "floatProduct".
+ * This operator emits the result as Long on port "longProduct", as Integer on port "integerProduct", as Double on port "doubleProduct", and as Float on port "floatProduct".
* Output is computed in current window.No state dependency among input tuples
* This is a pass through operator
* <br>
@@ -79,22 +79,22 @@ public class MultiplyByConstant extends BaseOperator
}
};
-
+
/**
* Long output port.
*/
public final transient DefaultOutputPort<Long> longProduct = new DefaultOutputPort<Long>();
-
+
/**
* Integer output port.
*/
public final transient DefaultOutputPort<Integer> integerProduct = new DefaultOutputPort<Integer>();
-
+
/**
* Double output port.
*/
public final transient DefaultOutputPort<Double> doubleProduct = new DefaultOutputPort<Double>();
-
+
/**
* Float output port.
*/
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/math/RunningAverage.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/RunningAverage.java b/library/src/main/java/com/datatorrent/lib/math/RunningAverage.java
index 286d72e..163f06b 100644
--- a/library/src/main/java/com/datatorrent/lib/math/RunningAverage.java
+++ b/library/src/main/java/com/datatorrent/lib/math/RunningAverage.java
@@ -24,7 +24,7 @@ import com.datatorrent.api.annotation.OperatorAnnotation;
import com.datatorrent.common.util.BaseOperator;
/**
- * Calculate the running average of the input numbers and emit it at the end of the window.
+ * Calculate the running average of the input numbers and emit it at the end of the window.
* <p>
* This is an end of window operator.<br>
* <br>
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/math/Sigma.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/Sigma.java b/library/src/main/java/com/datatorrent/lib/math/Sigma.java
index 6bfb9cf..1a9df60 100644
--- a/library/src/main/java/com/datatorrent/lib/math/Sigma.java
+++ b/library/src/main/java/com/datatorrent/lib/math/Sigma.java
@@ -26,7 +26,7 @@ import com.datatorrent.api.annotation.OperatorAnnotation;
* Adds incoming tuple to the state and emits the result of each addition on the respective ports.
* <p>
* The addition would go on forever.Result is emitted on four different data type ports:floatResult,integerResult,longResult,doubleResult.
- * Input tuple object has to be an implementation of the interface Collection.Tuples are emitted on the output ports only if they are connected.
+ * Input tuple object has to be an implementation of the interface Collection.Tuples are emitted on the output ports only if they are connected.
* This is done to avoid the cost of calling the functions when some ports are not connected.
* This is a stateful pass through operator<br>
* <b>Partitions : </b>, no will yield wrong results, no unifier on output port.
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/math/SingleVariableAbstractCalculus.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/SingleVariableAbstractCalculus.java b/library/src/main/java/com/datatorrent/lib/math/SingleVariableAbstractCalculus.java
index d97b905..9569074 100644
--- a/library/src/main/java/com/datatorrent/lib/math/SingleVariableAbstractCalculus.java
+++ b/library/src/main/java/com/datatorrent/lib/math/SingleVariableAbstractCalculus.java
@@ -23,7 +23,7 @@ import com.datatorrent.api.DefaultInputPort;
/**
* Transforms the input into the output after applying appropriate mathematical function to it and emits result on respective ports.
* <p>
- * Emits the result as Long on port "longResult", as Integer on port "integerResult",as Double on port "doubleResult", and as Float on port "floatResult".
+ * Emits the result as Long on port "longResult", as Integer on port "integerResult",as Double on port "doubleResult", and as Float on port "floatResult".
* This is a pass through operator<br>
* <br>
* <b>Ports</b>:<br>
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/math/Sum.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/Sum.java b/library/src/main/java/com/datatorrent/lib/math/Sum.java
index 0f5e64f..0214268 100644
--- a/library/src/main/java/com/datatorrent/lib/math/Sum.java
+++ b/library/src/main/java/com/datatorrent/lib/math/Sum.java
@@ -29,7 +29,7 @@ import com.datatorrent.lib.util.BaseNumberValueOperator;
import com.datatorrent.lib.util.UnifierSumNumber;
/**
- * This operator implements Unifier interface and emits the sum of values at the end of window.
+ * This operator implements Unifier interface and emits the sum of values at the end of window.
* <p>
* This is an end of window operator. Application can turn this into accumulated
* sum operator by setting cumulative flag to true. <br>
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/math/XmlKeyValueStringCartesianProduct.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/XmlKeyValueStringCartesianProduct.java b/library/src/main/java/com/datatorrent/lib/math/XmlKeyValueStringCartesianProduct.java
index 7f36ef5..cc50fe1 100644
--- a/library/src/main/java/com/datatorrent/lib/math/XmlKeyValueStringCartesianProduct.java
+++ b/library/src/main/java/com/datatorrent/lib/math/XmlKeyValueStringCartesianProduct.java
@@ -28,7 +28,7 @@ import com.datatorrent.api.DefaultOutputPort;
/**
* An implementation of the AbstractXmlKeyValueCartesianProduct operator that takes in the xml document
* as a String input and outputs the cartesian product as Strings.
- *
+ *
* @displayName Xml Key Value String Cartesian Product
* @category Math
* @tags cartesian product, string, xml
@@ -38,7 +38,7 @@ public class XmlKeyValueStringCartesianProduct extends AbstractXmlKeyValueCartes
{
InputSource source = new InputSource();
-
+
/**
* Output port that emits cartesian product as Strings.
*/
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/math/package-info.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/package-info.java b/library/src/main/java/com/datatorrent/lib/math/package-info.java
index f583662..c22309b 100644
--- a/library/src/main/java/com/datatorrent/lib/math/package-info.java
+++ b/library/src/main/java/com/datatorrent/lib/math/package-info.java
@@ -22,10 +22,10 @@
* Most of the arithmetic operators come in three types based on their schema.
* The operators whose names ends with "Map" (eg SumMap, MaxMap, MinMap) take in Map on input ports and emit HashMap. These operators use
* round robin partitioning and would merge as per their functionality.
- * <br>
+ * <br>
* The operators whose names ends with "KeyVal" (eg SumKeyVal, MaxKeyVal, MinKeyVal) take in KeyValPair and emit KeyValPair. These operators use
* sticky key partitioning and would merge using default pass through merge operator.
- * <br>
+ * <br>
* The operators whose names are just their function name (eg Sum, Min, Max) operate on same objects and emit a final result. These operators have no keys.
* They partition in roundrobin and would merge as per their functionality.
* <br>
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/parser/Parser.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/parser/Parser.java b/library/src/main/java/com/datatorrent/lib/parser/Parser.java
index 4f591f1..0403dc9 100644
--- a/library/src/main/java/com/datatorrent/lib/parser/Parser.java
+++ b/library/src/main/java/com/datatorrent/lib/parser/Parser.java
@@ -40,7 +40,7 @@ import com.datatorrent.lib.converter.Converter;
* <b>err</b>: emits <INPUT> error port that emits input tuple that could
* not be converted<br>
* <br>
- *
+ *
* @displayName Parser
* @tags parser converter
* @param <INPUT>
@@ -108,7 +108,7 @@ public abstract class Parser<INPUT, ERROROUT> extends BaseOperator implements Co
/**
* Get the class that needs to be formatted
- *
+ *
* @return Class<?>
*/
public Class<?> getClazz()
@@ -118,7 +118,7 @@ public abstract class Parser<INPUT, ERROROUT> extends BaseOperator implements Co
/**
* Set the class of tuple that needs to be formatted
- *
+ *
* @param clazz
*/
public void setClazz(Class<?> clazz)