You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by vr...@apache.org on 2016/09/27 16:41:17 UTC
[3/6] apex-malhar git commit: Fix trailing whitespace.
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/parser/XmlParser.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/parser/XmlParser.java b/library/src/main/java/com/datatorrent/lib/parser/XmlParser.java
index c8eeacc..bdf6fad 100644
--- a/library/src/main/java/com/datatorrent/lib/parser/XmlParser.java
+++ b/library/src/main/java/com/datatorrent/lib/parser/XmlParser.java
@@ -54,7 +54,7 @@ import com.datatorrent.netlet.util.DTThrowable;
* the Pojo Class. <br>
* <b>dateFormats</b>: Comma separated string of date formats e.g
* dd/mm/yyyy,dd-mmm-yyyy where first one would be considered default
- *
+ *
* @displayName XmlParser
* @category Parsers
* @tags xml pojo parser
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/projection/ProjectionOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/projection/ProjectionOperator.java b/library/src/main/java/com/datatorrent/lib/projection/ProjectionOperator.java
index 8c22140..6c17529 100644
--- a/library/src/main/java/com/datatorrent/lib/projection/ProjectionOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/projection/ProjectionOperator.java
@@ -59,7 +59,7 @@ import com.datatorrent.lib.util.PojoUtils;
* - projected port emits POJOs with projected fields from input POJOs
* - remainder port, if connected, emits POJOs with remainder fields from input POJOs
* - error port emits input POJOs as is upon error situations
- *
+ *
* <b>Examples</b>
* For {a, b, c} type of input tuples
* - when selectFields = "" and dropFields = "", projected port shall emit {a, b, c}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/script/ScriptOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/script/ScriptOperator.java b/library/src/main/java/com/datatorrent/lib/script/ScriptOperator.java
index 9532180..d6589ee 100644
--- a/library/src/main/java/com/datatorrent/lib/script/ScriptOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/script/ScriptOperator.java
@@ -31,8 +31,8 @@ import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
import com.datatorrent.common.util.BaseOperator;
/**
- * A base implementation of a BaseOperator for language script operator. Subclasses should provide the
- implementation of getting the bindings and process method.
+ * A base implementation of a BaseOperator for language script operator. Subclasses should provide the
+ implementation of getting the bindings and process method.
* Interface for language script operator.
* <p>
* @displayName Script
@@ -55,13 +55,13 @@ public abstract class ScriptOperator extends BaseOperator
}
};
-
+
/**
* Output outBindings port that emits a map of <String, Object>.
*/
@OutputPortFieldAnnotation(optional = true)
public final transient DefaultOutputPort<Map<String, Object>> outBindings = new DefaultOutputPort<Map<String, Object>>();
-
+
/**
* Output result port that emits an object as the result.
*/
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/util/AbstractKeyValueStorageAgent.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/util/AbstractKeyValueStorageAgent.java b/library/src/main/java/com/datatorrent/lib/util/AbstractKeyValueStorageAgent.java
index d206071..a7f5147 100644
--- a/library/src/main/java/com/datatorrent/lib/util/AbstractKeyValueStorageAgent.java
+++ b/library/src/main/java/com/datatorrent/lib/util/AbstractKeyValueStorageAgent.java
@@ -32,10 +32,10 @@ import com.datatorrent.api.StorageAgent;
/**
* Abstract implementation of {@link ApplicationAwareStorageAgent} which can be
* configured be KeyValue store witch implementation of {@link StorageAgentKeyValueStore}
- *
+ *
* NOTE - this should be picked from APEX-CORE once below feature is release
* https://issues.apache.org/jira/browse/APEXCORE-283
- *
+ *
* @param <S>
* Store implementation
*
@@ -71,7 +71,7 @@ public abstract class AbstractKeyValueStorageAgent<S extends StorageAgentKeyValu
/**
* Return yarn application id of running application
- *
+ *
* @return
*/
public String getApplicationId()
@@ -81,7 +81,7 @@ public abstract class AbstractKeyValueStorageAgent<S extends StorageAgentKeyValu
/**
* Set yarn application id
- *
+ *
* @param applicationId
*/
public void setApplicationId(String applicationId)
@@ -92,7 +92,7 @@ public abstract class AbstractKeyValueStorageAgent<S extends StorageAgentKeyValu
/**
* Generates key from operator id and window id to store unique operator
* checkpoints
- *
+ *
* @param operatorId
* @param windowId
* @return unique key for store
@@ -104,14 +104,14 @@ public abstract class AbstractKeyValueStorageAgent<S extends StorageAgentKeyValu
/**
* Stores the given operator object in configured store
- *
+ *
* @param object
* Operator object to store
* @param operatorId
* of operator
* @param windowId
* window id of operator to checkpoint
- *
+ *
*/
@Override
public void save(Object object, int operatorId, long windowId) throws IOException
@@ -136,7 +136,7 @@ public abstract class AbstractKeyValueStorageAgent<S extends StorageAgentKeyValu
/**
* Retrieves the operator object for given operator & window from configured
* store
- *
+ *
* @param operatorId
* of operator
* @param windowId
@@ -167,7 +167,7 @@ public abstract class AbstractKeyValueStorageAgent<S extends StorageAgentKeyValu
/**
* Removes stored operator object for given operatorId & windowId from store
- *
+ *
*/
@Override
public void delete(int operatorId, long windowId) throws IOException
@@ -189,7 +189,7 @@ public abstract class AbstractKeyValueStorageAgent<S extends StorageAgentKeyValu
/**
* Returns list window id for given operator id for which operator objects are
* stored but not removed
- *
+ *
*/
@Override
public long[] getWindowIds(int operatorId) throws IOException
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/util/StorageAgentKeyValueStore.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/util/StorageAgentKeyValueStore.java b/library/src/main/java/com/datatorrent/lib/util/StorageAgentKeyValueStore.java
index 6deed74..5477f4a 100644
--- a/library/src/main/java/com/datatorrent/lib/util/StorageAgentKeyValueStore.java
+++ b/library/src/main/java/com/datatorrent/lib/util/StorageAgentKeyValueStore.java
@@ -24,7 +24,7 @@ import com.datatorrent.lib.db.KeyValueStore;
/**
* Interface for KeyValue store
- *
+ *
*
* @since 3.4.0
*/
@@ -33,7 +33,7 @@ public interface StorageAgentKeyValueStore extends KeyValueStore
/**
* Get all the keys associated with key
- *
+ *
* @param key
* @return the list of all associated keys
*/
@@ -41,10 +41,10 @@ public interface StorageAgentKeyValueStore extends KeyValueStore
/**
* Set table/region name of store
- *
+ *
* @param tableName
*/
public void setTableName(String tableName);
-
-
+
+
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/util/TableInfo.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/util/TableInfo.java b/library/src/main/java/com/datatorrent/lib/util/TableInfo.java
index b0d454d..52bf117 100644
--- a/library/src/main/java/com/datatorrent/lib/util/TableInfo.java
+++ b/library/src/main/java/com/datatorrent/lib/util/TableInfo.java
@@ -66,6 +66,6 @@ public class TableInfo<T extends FieldInfo>
{
this.fieldsInfo = fieldsInfo;
}
-
-
+
+
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/util/TopNSort.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/util/TopNSort.java b/library/src/main/java/com/datatorrent/lib/util/TopNSort.java
index ba9cb01..042c75b 100644
--- a/library/src/main/java/com/datatorrent/lib/util/TopNSort.java
+++ b/library/src/main/java/com/datatorrent/lib/util/TopNSort.java
@@ -136,7 +136,7 @@ public class TopNSort<E>
if (list.isEmpty()) {
return list;
}
-
+
Collections.reverse(list);
return list;
//return ret;
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/util/package-info.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/util/package-info.java b/library/src/main/java/com/datatorrent/lib/util/package-info.java
index f65e415..7b1140c 100644
--- a/library/src/main/java/com/datatorrent/lib/util/package-info.java
+++ b/library/src/main/java/com/datatorrent/lib/util/package-info.java
@@ -17,7 +17,7 @@
* under the License.
*/
/**
- * Library of shared operators and utilities.
+ * Library of shared operators and utilities.
*/
@org.apache.hadoop.classification.InterfaceStability.Evolving
package com.datatorrent.lib.util;
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/org/apache/apex/malhar/lib/dedup/BoundedDedupOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dedup/BoundedDedupOperator.java b/library/src/main/java/org/apache/apex/malhar/lib/dedup/BoundedDedupOperator.java
index 5509ba0..7763103 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/dedup/BoundedDedupOperator.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/dedup/BoundedDedupOperator.java
@@ -42,17 +42,17 @@ import com.datatorrent.netlet.util.Slice;
* An implementation for {@link AbstractDeduper} which handles the case of bounded data set.
* This implementation assumes that the incoming tuple does not have a time field, and the de-duplication
* is to be strictly based on the key of the tuple.
- *
+ *
* This implementation uses {@link ManagedTimeStateImpl} for storing the tuple keys on the persistent storage.
- *
+ *
* Following properties need to be configured for the functioning of the operator:
* 1. {@link #keyExpression}: The java expression to extract the key fields in the incoming tuple (POJO)
- * 2. {@link #numBuckets} (optional): The number of buckets that need to be used for storing the keys of the
+ * 2. {@link #numBuckets} (optional): The number of buckets that need to be used for storing the keys of the
* incoming tuples.
- * NOTE: Users can decide upon the proper value for this parameter by guessing the number of distinct keys
+ * NOTE: Users can decide upon the proper value for this parameter by guessing the number of distinct keys
* in the application. A appropriate value would be sqrt(num distinct keys). In case, the number of distinct keys is a
* huge number, leave it blank so that the default value of 46340 will be used. The rationale for using this number is
- * that sqrt(max integer) = 46340. This implies that the number of buckets used will roughly be equal to the size of
+ * that sqrt(max integer) = 46340. This implies that the number of buckets used will roughly be equal to the size of
* each bucket, thus spreading the load equally among each bucket.
*
*
@@ -194,10 +194,10 @@ public class BoundedDedupOperator extends AbstractDeduper<Object>
/**
* Sets the number of buckets
- * NOTE: Users can decide upon the proper value for this parameter by guessing the number of distinct keys
+ * NOTE: Users can decide upon the proper value for this parameter by guessing the number of distinct keys
* in the application. A appropriate value would be sqrt(num distinct keys). In case, the number of distinct keys is a
* huge number, leave it blank so that the default value of 46340 will be used. The rationale for using this number is
- * that sqrt(max integer) = 46340. This implies that the number of buckets used will roughly be equal to the size of
+ * that sqrt(max integer) = 46340. This implies that the number of buckets used will roughly be equal to the size of
* each bucket, thus spreading the load equally among each bucket.
* @param numBuckets the number of buckets
*/
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AbstractCompositeAggregator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AbstractCompositeAggregator.java b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AbstractCompositeAggregator.java
index f7ab25d..52ef811 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AbstractCompositeAggregator.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AbstractCompositeAggregator.java
@@ -44,9 +44,9 @@ public abstract class AbstractCompositeAggregator implements CompositeAggregator
//protected int embedAggregatorID;
protected Set<Integer> embedAggregatorDdIds = Sets.newHashSet();
protected Set<String> fields = Sets.newHashSet();
-
+
protected DimensionsConversionContext dimensionsConversionContext;
-
+
public DimensionsConversionContext getDimensionsConversionContext()
{
return dimensionsConversionContext;
@@ -63,7 +63,7 @@ public abstract class AbstractCompositeAggregator implements CompositeAggregator
this.setDimensionsConversionContext(dimensionsConversionContext);
return this;
}
-
+
public String getEmbedAggregatorName()
{
return embedAggregatorName;
@@ -96,7 +96,7 @@ public abstract class AbstractCompositeAggregator implements CompositeAggregator
{
this.dimensionDescriptorID = dimensionDescriptorID;
}
-
+
@Override
public int getAggregatorID()
{
@@ -118,7 +118,7 @@ public abstract class AbstractCompositeAggregator implements CompositeAggregator
{
this.aggregateDescriptor = aggregateDescriptor;
}
-
+
@Override
public Set<String> getFields()
{
@@ -155,7 +155,7 @@ public abstract class AbstractCompositeAggregator implements CompositeAggregator
{
embedAggregatorDdIds.addAll(ddids);
}
-
+
/**
* bright: TODO: check
*/
@@ -164,5 +164,5 @@ public abstract class AbstractCompositeAggregator implements CompositeAggregator
{
return null;
}
-
+
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AbstractCompositeAggregatorFactory.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AbstractCompositeAggregatorFactory.java b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AbstractCompositeAggregatorFactory.java
index 8156064..5cf4582 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AbstractCompositeAggregatorFactory.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AbstractCompositeAggregatorFactory.java
@@ -28,7 +28,7 @@ public abstract class AbstractCompositeAggregatorFactory implements CompositeAgg
protected static final String NAME_TEMPLATE = "%s-%s-%s";
protected static final String PROPERTY_SEPERATOR = "_";
protected static final String PROPERTY_VALUE_SEPERATOR = "|";
-
+
@Override
public String getCompositeAggregatorName(String aggregatorType, String embededAggregatorName,
Map<String, Object> properties)
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AbstractIncrementalAggregator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AbstractIncrementalAggregator.java b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AbstractIncrementalAggregator.java
index bf2e342..bf2054e 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AbstractIncrementalAggregator.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AbstractIncrementalAggregator.java
@@ -48,7 +48,7 @@ import com.datatorrent.lib.appdata.schemas.CustomTimeBucket;
* the sum aggregator. And the {DimensionsEventregate} event produced by the sum aggregator will contain two fields,
* one for cost and one for revenue.
* </p>
- *
+ *
*
* @since 3.4.0
*/
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AbstractTopBottomAggregator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AbstractTopBottomAggregator.java b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AbstractTopBottomAggregator.java
index 41d1372..e38ea0e 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AbstractTopBottomAggregator.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AbstractTopBottomAggregator.java
@@ -43,7 +43,7 @@ public abstract class AbstractTopBottomAggregator extends AbstractCompositeAggre
public static final String PROP_COUNT = "count";
protected int count;
protected SortedSet<String> subCombinations = Sets.newTreeSet();
-
+
public AbstractTopBottomAggregator withEmbedAggregatorName(String embedAggregatorName)
{
this.setEmbedAggregatorName(embedAggregatorName);
@@ -55,7 +55,7 @@ public abstract class AbstractTopBottomAggregator extends AbstractCompositeAggre
this.setSubCombinations(subCombinations);
return this;
}
-
+
public AbstractTopBottomAggregator withCount(int count)
{
this.setCount(count);
@@ -71,7 +71,7 @@ public abstract class AbstractTopBottomAggregator extends AbstractCompositeAggre
{
this.count = count;
}
-
+
public void setSubCombinations(Set<String> subCombinations)
{
this.subCombinations.clear();
@@ -91,11 +91,12 @@ public abstract class AbstractTopBottomAggregator extends AbstractCompositeAggre
/**
* TOP/BOTTOM return a list of value
*/
+ @Override
public Type getOutputType()
{
return Type.OBJECT;
}
-
+
@Override
public int hashCode()
{
@@ -115,7 +116,7 @@ public abstract class AbstractTopBottomAggregator extends AbstractCompositeAggre
if (getClass() != obj.getClass()) {
return false;
}
-
+
AbstractTopBottomAggregator other = (AbstractTopBottomAggregator)obj;
if (embedAggregatorName != other.embedAggregatorName
&& (embedAggregatorName == null || !embedAggregatorName.equals(other.embedAggregatorName))) {
@@ -131,8 +132,8 @@ public abstract class AbstractTopBottomAggregator extends AbstractCompositeAggre
return true;
}
-
-
+
+
/**
* The result keep a list of object for each aggregate value
* The value of resultAggregate should keep a list of inputEventKey(the value can be get from cache or load) or a map
@@ -149,7 +150,7 @@ public abstract class AbstractTopBottomAggregator extends AbstractCompositeAggre
//there are problem for composite's value field descriptor, just ignore now.
GPOMutable resultGpo = resultAggregate.getAggregates();
final List<String> compositeFieldList = resultAggregate.getEventKey().getKey().getFieldDescriptor().getFieldList();
-
+
//Map<EventKey, Aggregate> existedSubEventKeyToAggregate = Maps.newHashMap();
for (String valueField : resultGpo.getFieldDescriptor().getFieldList()) {
//the resultGpo keep a list of sub aggregates
@@ -168,7 +169,7 @@ public abstract class AbstractTopBottomAggregator extends AbstractCompositeAggre
/**
* get store map key from the eventKey
- *
+ *
* @param eventKey
* @return
*/
@@ -183,16 +184,16 @@ public abstract class AbstractTopBottomAggregator extends AbstractCompositeAggre
key.append(subEventKey.getKey().getField(field)).append(KEY_VALUE_SEPERATOR);
}
key.deleteCharAt(key.length() - 1);
-
+
return key.toString();
}
-
+
/**
- * update existed sub aggregate.
+ * update existed sub aggregate.
* The sub aggregates which kept in composite aggregate as candidate could be changed. synchronize the value with
* input aggregates.
- *
+ *
* @param resultAggregate
* @param valueField
* @param inputSubEventKeys
@@ -218,7 +219,7 @@ public abstract class AbstractTopBottomAggregator extends AbstractCompositeAggre
}
}
}
-
+
/**
* need a map of value field from the inputGpo to resultGpo, use the index of Fields as the index
* @param resultGpo
@@ -241,7 +242,7 @@ public abstract class AbstractTopBottomAggregator extends AbstractCompositeAggre
fieldToType.get(aggregateField));
}
}
-
+
/**
* seperate it in case sub class override it.
* @param fieldName
@@ -252,7 +253,7 @@ public abstract class AbstractTopBottomAggregator extends AbstractCompositeAggre
{
return Maps.newHashMap();
}
-
+
/**
* compare the result(resultMap) with input(inputFieldName, inputFieldValue)
* @param resultMap
@@ -275,7 +276,7 @@ public abstract class AbstractTopBottomAggregator extends AbstractCompositeAggre
}
}
-
+
/**
* shoud the result element replaced by input element.
* the inputElement and resultElement should be same type
@@ -299,11 +300,11 @@ public abstract class AbstractTopBottomAggregator extends AbstractCompositeAggre
int compareResult = ((Comparable<Object>)resultElement).compareTo(inputElement);
return shouldReplaceResultElement(compareResult);
}
-
+
//handle other cases
throw new RuntimeException("Should NOT come here.");
-
+
}
-
+
protected abstract boolean shouldReplaceResultElement(int resultCompareToInput);
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorRegistry.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorRegistry.java b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorRegistry.java
index 6482c3b..85f1822 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorRegistry.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorRegistry.java
@@ -108,9 +108,9 @@ public class AggregatorRegistry implements Serializable
* {@link IncrementalAggregator} to the corresponding {@link IncrementalAggregator}.
*/
private transient Map<Integer, IncrementalAggregator> incrementalAggregatorIDToAggregator;
-
+
protected transient Map<Integer, AbstractTopBottomAggregator> topBottomAggregatorIDToAggregator;
-
+
/**
* This is a map from the name assigned to an {@link IncrementalAggregator} to the {@link IncrementalAggregator}.
*/
@@ -119,19 +119,19 @@ public class AggregatorRegistry implements Serializable
* This is a map from the name assigned to an {@link OTFAggregator} to the {@link OTFAggregator}.
*/
private Map<String, OTFAggregator> nameToOTFAggregator;
-
+
/**
* the map from TOPN and BOTTOM aggregator to name
*/
private Map<String, AbstractTopBottomAggregator> nameToTopBottomAggregator = Maps.newHashMap();
-
+
/**
* This is a map from the name of an {@link IncrementalAggregator} to the ID of that {@link IncrementalAggregator}.
*/
private Map<String, Integer> incrementalAggregatorNameToID;
-
+
protected Map<String, Integer> topBottomAggregatorNameToID = Maps.newHashMap();
-
+
protected static Set<String> topBottomAggregatorNames;
@@ -269,12 +269,12 @@ public class AggregatorRegistry implements Serializable
Preconditions.checkNotNull(entry.getKey());
Preconditions.checkNotNull(entry.getValue());
}
-
+
for (Map.Entry<String, Integer> entry : topBottomAggregatorNameToID.entrySet()) {
Preconditions.checkNotNull(entry.getKey());
Preconditions.checkNotNull(entry.getValue());
}
-
+
for (Map.Entry<String, AbstractTopBottomAggregator> entry : nameToTopBottomAggregator.entrySet()) {
Preconditions.checkNotNull(entry.getKey());
Preconditions.checkNotNull(entry.getValue());
@@ -337,7 +337,7 @@ public class AggregatorRegistry implements Serializable
nameToTopBottomAggregator.get(aggregatorName));
}
}
-
+
/**
* This is a helper method which sets and validated the given mapping from an {@link IncrementalAggregator}'s name
* to an {@link IncrementalAggregator}.
@@ -375,7 +375,7 @@ public class AggregatorRegistry implements Serializable
nameToOTFAggregator.containsKey(aggregatorName)) {
return true;
}
-
+
//the composite probably send whole aggregator name
String aggregatorType = aggregatorName.split("-")[0];
return (AggregatorTopBottomType.valueOf(aggregatorType) != null);
@@ -399,12 +399,12 @@ public class AggregatorRegistry implements Serializable
{
return nameToOTFAggregator.containsKey(aggregatorName);
}
-
+
public boolean isTopBottomAggregatorType(String aggregatorType)
{
return (AggregatorTopBottomType.valueOf(aggregatorType) != null);
}
-
+
/**
* Gets the mapping from an {@link IncrementalAggregator}'s class to the {@link IncrementalAggregator}.
*
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorUtils.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorUtils.java b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorUtils.java
index d9ad83d..006cadf 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorUtils.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorUtils.java
@@ -164,5 +164,5 @@ public final class AggregatorUtils
return new FieldsDescriptor(fieldToType, fieldToSerde);
}
-
+
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/CompositeAggregator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/CompositeAggregator.java b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/CompositeAggregator.java
index 916467d..e64e957 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/CompositeAggregator.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/CompositeAggregator.java
@@ -37,13 +37,13 @@ public interface CompositeAggregator
public int getDimensionDescriptorID();
public int getAggregatorID();
-
+
public Set<Integer> getEmbedAggregatorDdIds();
-
+
public Set<String> getFields();
public FieldsDescriptor getAggregateDescriptor();
-
+
public FieldsDescriptor getMetaDataDescriptor();
/**
@@ -52,9 +52,9 @@ public interface CompositeAggregator
* @return The output type of the {@link CompositeAggregator}.
*/
public Type getOutputType();
-
+
/**
- *
+ *
* @param resultAggregate the aggregate to put the result
* @param inputEventKeys The input(incremental) event keys, used to locate the input aggregates
* @param inputAggregatesRepo: the map of the EventKey to Aggregate keep the super set of aggregate required
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/CompositeAggregatorFactory.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/CompositeAggregatorFactory.java b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/CompositeAggregatorFactory.java
index da1d225..18682d0 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/CompositeAggregatorFactory.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/CompositeAggregatorFactory.java
@@ -35,7 +35,7 @@ public interface CompositeAggregatorFactory
* @return
*/
//public boolean isValidCompositeAggregatorName(String aggregatorName);
-
+
/**
* get composite aggregator name based on composite aggregator information
* @param aggregatorType
@@ -45,7 +45,7 @@ public interface CompositeAggregatorFactory
*/
public String getCompositeAggregatorName(String aggregatorType, String embedAggregatorName,
Map<String, Object> properties);
-
+
/**
* create composite aggregator name based on composite aggregator information
* @param aggregatorType
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/DefaultCompositeAggregatorFactory.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/DefaultCompositeAggregatorFactory.java b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/DefaultCompositeAggregatorFactory.java
index 125c3f1..a3a148a 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/DefaultCompositeAggregatorFactory.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/DefaultCompositeAggregatorFactory.java
@@ -25,7 +25,7 @@ import com.google.common.collect.Maps;
/**
* The DefaultCompositeAggregatorFactory find the specific factory according to the aggregator type
* and delegate to the specific factory.
- *
+ *
*
* @since 3.4.0
*/
@@ -34,9 +34,9 @@ public class DefaultCompositeAggregatorFactory implements CompositeAggregatorFac
public static final DefaultCompositeAggregatorFactory defaultInst = new DefaultCompositeAggregatorFactory()
.addFactory(AggregatorTopBottomType.TOPN.name(), TopBottomAggregatorFactory.defaultInstance)
.addFactory(AggregatorTopBottomType.BOTTOMN.name(), TopBottomAggregatorFactory.defaultInstance);
-
+
protected Map<String, CompositeAggregatorFactory> factoryRepository = Maps.newHashMap();
-
+
@Override
public String getCompositeAggregatorName(String aggregatorType, String embedAggregatorName,
Map<String, Object> properties)
@@ -57,7 +57,7 @@ public class DefaultCompositeAggregatorFactory implements CompositeAggregatorFac
{
return factoryRepository.get(aggregatorType);
}
-
+
public DefaultCompositeAggregatorFactory addFactory(String aggregatorType, CompositeAggregatorFactory factory)
{
factoryRepository.put(aggregatorType, factory);
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/TopBottomAggregatorFactory.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/TopBottomAggregatorFactory.java b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/TopBottomAggregatorFactory.java
index 89f6bb7..8843b4e 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/TopBottomAggregatorFactory.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/TopBottomAggregatorFactory.java
@@ -33,7 +33,7 @@ public class TopBottomAggregatorFactory extends AbstractCompositeAggregatorFacto
public static final String PROPERTY_NAME_SUB_COMBINATIONS = "subCombinations";
public static final TopBottomAggregatorFactory defaultInstance = new TopBottomAggregatorFactory();
-
+
@Override
public <T> AbstractTopBottomAggregator createCompositeAggregator(String aggregatorType, String embedAggregatorName,
Map<String, Object> properties)
@@ -41,7 +41,7 @@ public class TopBottomAggregatorFactory extends AbstractCompositeAggregatorFacto
return createTopBottomAggregator(aggregatorType, embedAggregatorName, getCount(properties),
getSubCombinations(properties));
}
-
+
public <T> AbstractTopBottomAggregator createTopBottomAggregator(String aggregatorType, String embedAggregatorName,
int count, String[] subCombinations)
{
@@ -58,7 +58,7 @@ public class TopBottomAggregatorFactory extends AbstractCompositeAggregatorFacto
aggregator.setEmbedAggregatorName(embedAggregatorName);
aggregator.setCount(count);
aggregator.setSubCombinations(subCombinations);
-
+
return aggregator;
}
@@ -66,12 +66,12 @@ public class TopBottomAggregatorFactory extends AbstractCompositeAggregatorFacto
{
return Integer.valueOf((String)properties.get(PROPERTY_NAME_COUNT));
}
-
+
protected String[] getSubCombinations(Map<String, Object> properties)
{
return (String[])properties.get(PROPERTY_NAME_SUB_COMBINATIONS);
}
-
+
/**
* The properties of TOP or BOTTOM are count and subCombinations.
* count only have one value and subCombinations is a set of string, we can order combinations to simplify the name
@@ -82,13 +82,13 @@ public class TopBottomAggregatorFactory extends AbstractCompositeAggregatorFacto
StringBuilder sb = new StringBuilder();
String count = (String)properties.get(PROPERTY_NAME_COUNT);
sb.append(count).append(PROPERTY_SEPERATOR);
-
+
String[] subCombinations = (String[])properties.get(PROPERTY_NAME_SUB_COMBINATIONS);
Set<String> sortedSubCombinations = Sets.newTreeSet();
for (String subCombination : subCombinations) {
sortedSubCombinations.add(subCombination);
}
-
+
for (String subCombination : sortedSubCombinations) {
sb.append(subCombination).append(PROPERTY_SEPERATOR);
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReader.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReader.java b/library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReader.java
index 31f35aa..268c51b 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReader.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReader.java
@@ -32,7 +32,7 @@ import com.datatorrent.lib.io.block.ReaderContext;
* This operator can be used for reading records/tuples from Filesystem in
* parallel (without ordering guarantees between tuples). Records can be
* delimited (e.g. newline) or fixed width records. Output tuples are byte[].
- *
+ *
* Typically, this operator will be connected to output of FileSplitterInput to
* read records in parallel.
*
@@ -106,7 +106,7 @@ public class FSRecordReader extends FSSliceReader
/**
* Criteria for record split
- *
+ *
* @param mode
* Mode
*/
@@ -117,7 +117,7 @@ public class FSRecordReader extends FSSliceReader
/**
* Criteria for record split
- *
+ *
* @return mode
*/
public RECORD_READER_MODE getMode()
@@ -127,7 +127,7 @@ public class FSRecordReader extends FSSliceReader
/**
* Length for fixed width record
- *
+ *
* @param recordLength
*/
public void setRecordLength(int recordLength)
@@ -140,7 +140,7 @@ public class FSRecordReader extends FSSliceReader
/**
* Length for fixed width record
- *
+ *
* @return record length
*/
public int getRecordLength()
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReaderModule.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReaderModule.java b/library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReaderModule.java
index 0a9b321..d508320 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReaderModule.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReaderModule.java
@@ -42,7 +42,7 @@ import com.datatorrent.lib.io.fs.FileSplitterInput;
* (Ordering is not guaranteed when records are read in parallel)
*
* Input directory is scanned at specified interval to poll for new data.
- *
+ *
* The module reads data in parallel, following parameters can be configured
* <br/>
* 1. files: list of file(s)/directories to read<br/>
@@ -91,7 +91,7 @@ public class FSRecordReaderModule implements Module
/**
* Creates an instance of FileSplitter
- *
+ *
* @return
*/
public FileSplitterInput createFileSplitter()
@@ -101,7 +101,7 @@ public class FSRecordReaderModule implements Module
/**
* Creates an instance of Record Reader
- *
+ *
* @return FSRecordReader instance
*/
public FSRecordReader createRecordReader()
@@ -233,7 +233,7 @@ public class FSRecordReaderModule implements Module
/**
* Gets readers count
- *
+ *
* @return readersCount
*/
public int getReadersCount()
@@ -243,7 +243,7 @@ public class FSRecordReaderModule implements Module
/**
* Static count of readers to read input file
- *
+ *
* @param readersCount
*/
public void setReadersCount(int readersCount)
@@ -276,7 +276,7 @@ public class FSRecordReaderModule implements Module
* Sets number of blocks to be emitted per window.<br/>
* A lot of blocks emitted per window can overwhelm the downstream operators.
* Set this value considering blockSize and readersCount.
- *
+ *
* @param threshold
*/
public void setBlocksThreshold(int threshold)
@@ -288,7 +288,7 @@ public class FSRecordReaderModule implements Module
* Gets number of blocks to be emitted per window.<br/>
* A lot of blocks emitted per window can overwhelm the downstream operators.
* Set this value considering blockSize and readersCount.
- *
+ *
* @return
*/
public int getBlocksThreshold()
@@ -298,7 +298,7 @@ public class FSRecordReaderModule implements Module
/**
* Criteria for record split
- *
+ *
* @return mode
*/
public RECORD_READER_MODE getMode()
@@ -308,7 +308,7 @@ public class FSRecordReaderModule implements Module
/**
* Criteria for record split
- *
+ *
* @param mode
* Mode
*/
@@ -319,7 +319,7 @@ public class FSRecordReaderModule implements Module
/**
* Length for fixed width record
- *
+ *
* @return record length
*/
public int getRecordLength()
@@ -329,7 +329,7 @@ public class FSRecordReaderModule implements Module
/**
* Length for fixed width record
- *
+ *
* @param recordLength
*/
public void setRecordLength(int recordLength)
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java
index 237f4b9..3b01ed2 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java
@@ -193,7 +193,7 @@ public class IncrementalCheckpointManager extends FSWindowDataManager
Map<Long, Object> artifactPerWindow = new HashMap<>();
FileSystemWAL.FileSystemWALReader reader = getWal().getReader();
reader.seek(getWal().getWalStartPointer());
-
+
Slice windowSlice = readNext(reader);
while (reader.getCurrentPointer().compareTo(getWal().getWalEndPointerAfterRecovery()) < 0 && windowSlice != null) {
long window = Longs.fromByteArray(windowSlice.toByteArray());
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/TimeBasedPriorityQueue.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/TimeBasedPriorityQueue.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/TimeBasedPriorityQueue.java
index b12f119..52dfac7 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/TimeBasedPriorityQueue.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/TimeBasedPriorityQueue.java
@@ -117,13 +117,13 @@ public class TimeBasedPriorityQueue<T>
} else if (this.timestamp > timeWrapper.getTimestamp()) {
return 1;
}
-
+
/**
* NOTE: the following use the equals() to implement the compareTo() for key.
- * it should be OK as the compareTo() only used by TimeBasedPriorityQueue.sortedTimestamp,
+ * it should be OK as the compareTo() only used by TimeBasedPriorityQueue.sortedTimestamp,
* which only care about the order of time ( the order for key doesn't matter ).
* But would cause problem if add other function which depended on the order of the key.
- *
+ *
* Add compare by hashCode when not equals in order to compatible with the interface for most cases.
* Anyway, the order of key is not guaranteed. And we should not return 0 if not equals
*/
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/org/apache/apex/malhar/lib/wal/FSWindowDataManager.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/wal/FSWindowDataManager.java b/library/src/main/java/org/apache/apex/malhar/lib/wal/FSWindowDataManager.java
index 2b85580..6e8774e 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/wal/FSWindowDataManager.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/wal/FSWindowDataManager.java
@@ -134,7 +134,7 @@ public class FSWindowDataManager implements WindowDataManager
* Used by {@link IncrementalCheckpointManager}
*/
private boolean relyOnCheckpoints;
-
+
private transient long largestCompletedWindow = Stateless.WINDOW_ID;
private final FSWindowReplayWAL wal = new FSWindowReplayWAL();
@@ -303,7 +303,7 @@ public class FSWindowDataManager implements WindowDataManager
long lastWindow = Stateless.WINDOW_ID;
Slice slice = readNext(reader);
-
+
while (slice != null) {
boolean skipComplete = skipNext(reader); //skip the artifact because we need just the largest window id.
if (!skipComplete) {
@@ -311,7 +311,7 @@ public class FSWindowDataManager implements WindowDataManager
break;
}
long offset = reader.getCurrentPointer().getOffset();
-
+
long window = Longs.fromByteArray(slice.toByteArray());
if (ceilingWindow != null && window > ceilingWindow) {
break;
@@ -393,7 +393,7 @@ public class FSWindowDataManager implements WindowDataManager
}
}
}
-
+
/**
* Save writes 2 entries to the wal: <br/>
* <ol>
@@ -481,7 +481,7 @@ public class FSWindowDataManager implements WindowDataManager
wal.windowWalParts.put(currentWindow, reader.getCurrentPointer().getPartNum());
wal.retrievedWindow = readNext(reader); //null or next window
-
+
return fromSlice(data);
} else if (windowId < currentWindow) {
//no artifact saved corresponding to that window and artifact is not read.
@@ -500,7 +500,7 @@ public class FSWindowDataManager implements WindowDataManager
}
return null;
}
-
+
/**
* Deletes artifacts for all windows less than equal to committed window id.<p/>
*
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/org/apache/apex/malhar/lib/wal/FSWindowReplayWAL.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/wal/FSWindowReplayWAL.java b/library/src/main/java/org/apache/apex/malhar/lib/wal/FSWindowReplayWAL.java
index 74ca929..d848804 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/wal/FSWindowReplayWAL.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/wal/FSWindowReplayWAL.java
@@ -73,12 +73,12 @@ public class FSWindowReplayWAL extends FileSystemWAL
throw new RuntimeException("while setup");
}
}
-
+
public FileSystemWALPointer getWalEndPointerAfterRecovery()
{
return walEndPointerAfterRecovery;
}
-
+
/**
* Finalizes files just after rotation. Doesn't wait for the window to be committed.
*/
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/org/apache/apex/malhar/lib/wal/FileSystemWAL.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/wal/FileSystemWAL.java b/library/src/main/java/org/apache/apex/malhar/lib/wal/FileSystemWAL.java
index b7d5ba1..49f61a4 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/wal/FileSystemWAL.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/wal/FileSystemWAL.java
@@ -163,7 +163,7 @@ public class FileSystemWAL implements WAL<FileSystemWAL.FileSystemWALReader, Fil
{
return filePath + "_" + partNumber;
}
-
+
/**
* @return the wal start pointer
*/
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Average.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Average.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Average.java
index f0a66a4..af623f3 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Average.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Average.java
@@ -33,7 +33,7 @@ public class Average implements Accumulation<Double, MutablePair<Double, Long>,
{
return new MutablePair<>(0.0, 0L);
}
-
+
@Override
public MutablePair<Double, Long> accumulate(MutablePair<Double, Long> accu, Double input)
{
@@ -41,7 +41,7 @@ public class Average implements Accumulation<Double, MutablePair<Double, Long>,
accu.setRight(accu.getRight() + 1);
return accu;
}
-
+
@Override
public MutablePair<Double, Long> merge(MutablePair<Double, Long> accu1, MutablePair<Double, Long> accu2)
{
@@ -50,13 +50,13 @@ public class Average implements Accumulation<Double, MutablePair<Double, Long>,
accu1.setRight(accu1.getRight() + accu2.getRight());
return accu1;
}
-
+
@Override
public Double getOutput(MutablePair<Double, Long> accumulatedValue)
{
return accumulatedValue.getLeft();
}
-
+
@Override
public Double getRetraction(Double value)
{
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Group.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Group.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Group.java
index f2affd1..d217ce9 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Group.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Group.java
@@ -35,27 +35,27 @@ public class Group<T> implements Accumulation<T, List<T>, List<T>>
{
return new ArrayList<>();
}
-
+
@Override
public List<T> accumulate(List<T> accumulatedValue, T input)
{
accumulatedValue.add(input);
return accumulatedValue;
}
-
+
@Override
public List<T> merge(List<T> accumulatedValue1, List<T> accumulatedValue2)
{
accumulatedValue1.addAll(accumulatedValue2);
return accumulatedValue1;
}
-
+
@Override
public List<T> getOutput(List<T> accumulatedValue)
{
return accumulatedValue;
}
-
+
@Override
public List<T> getRetraction(List<T> value)
{
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Max.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Max.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Max.java
index 64ff0c4..92aec18 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Max.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Max.java
@@ -28,20 +28,20 @@ import org.apache.apex.malhar.lib.window.Accumulation;
*/
public class Max<T> implements Accumulation<T, T, T>
{
-
+
Comparator<T> comparator;
-
+
public void setComparator(Comparator<T> comparator)
{
this.comparator = comparator;
}
-
+
@Override
public T defaultAccumulatedValue()
{
return null;
}
-
+
@Override
public T accumulate(T accumulatedValue, T input)
{
@@ -55,19 +55,19 @@ public class Max<T> implements Accumulation<T, T, T>
throw new RuntimeException("Tuple cannot be compared");
}
}
-
+
@Override
public T merge(T accumulatedValue1, T accumulatedValue2)
{
return accumulate(accumulatedValue1, accumulatedValue2);
}
-
+
@Override
public T getOutput(T accumulatedValue)
{
return accumulatedValue;
}
-
+
@Override
public T getRetraction(T value)
{
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Min.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Min.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Min.java
index 48017a7..2b6247a 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Min.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Min.java
@@ -28,20 +28,20 @@ import org.apache.apex.malhar.lib.window.Accumulation;
*/
public class Min<T> implements Accumulation<T, T, T>
{
-
+
Comparator<T> comparator;
-
+
public void setComparator(Comparator<T> comparator)
{
this.comparator = comparator;
}
-
+
@Override
public T defaultAccumulatedValue()
{
return null;
}
-
+
@Override
public T accumulate(T accumulatedValue, T input)
{
@@ -55,19 +55,19 @@ public class Min<T> implements Accumulation<T, T, T>
throw new RuntimeException("Tuple cannot be compared");
}
}
-
+
@Override
public T merge(T accumulatedValue1, T accumulatedValue2)
{
return accumulate(accumulatedValue1, accumulatedValue2);
}
-
+
@Override
public T getOutput(T accumulatedValue)
{
return accumulatedValue;
}
-
+
@Override
public T getRetraction(T value)
{
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/RemoveDuplicates.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/RemoveDuplicates.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/RemoveDuplicates.java
index 2548f72..53f3534 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/RemoveDuplicates.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/RemoveDuplicates.java
@@ -38,14 +38,14 @@ public class RemoveDuplicates<T> implements Accumulation<T, Set<T>, List<T>>
{
return new HashSet<>();
}
-
+
@Override
public Set<T> accumulate(Set<T> accumulatedValue, T input)
{
accumulatedValue.add(input);
return accumulatedValue;
}
-
+
@Override
public Set<T> merge(Set<T> accumulatedValue1, Set<T> accumulatedValue2)
{
@@ -54,7 +54,7 @@ public class RemoveDuplicates<T> implements Accumulation<T, Set<T>, List<T>>
}
return accumulatedValue1;
}
-
+
@Override
public List<T> getOutput(Set<T> accumulatedValue)
{
@@ -64,7 +64,7 @@ public class RemoveDuplicates<T> implements Accumulation<T, Set<T>, List<T>>
return new ArrayList<>(accumulatedValue);
}
}
-
+
@Override
public List<T> getRetraction(List<T> value)
{
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumDouble.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumDouble.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumDouble.java
index 11ab2ab..475d653 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumDouble.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumDouble.java
@@ -33,27 +33,27 @@ public class SumDouble implements Accumulation<Double, MutableDouble, Double>
{
return new MutableDouble(0.0);
}
-
+
@Override
public MutableDouble accumulate(MutableDouble accumulatedValue, Double input)
{
accumulatedValue.add(input);
return accumulatedValue;
}
-
+
@Override
public MutableDouble merge(MutableDouble accumulatedValue1, MutableDouble accumulatedValue2)
{
accumulatedValue1.add(accumulatedValue2);
return accumulatedValue1;
}
-
+
@Override
public Double getOutput(MutableDouble accumulatedValue)
{
return accumulatedValue.doubleValue();
}
-
+
@Override
public Double getRetraction(Double value)
{
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumFloat.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumFloat.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumFloat.java
index d11bec3..dff3be6 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumFloat.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumFloat.java
@@ -33,27 +33,27 @@ public class SumFloat implements Accumulation<Float, MutableFloat, Float>
{
return new MutableFloat(0.);
}
-
+
@Override
public MutableFloat accumulate(MutableFloat accumulatedValue, Float input)
{
accumulatedValue.add(input);
return accumulatedValue;
}
-
+
@Override
public MutableFloat merge(MutableFloat accumulatedValue1, MutableFloat accumulatedValue2)
{
accumulatedValue1.add(accumulatedValue2);
return accumulatedValue1;
}
-
+
@Override
public Float getOutput(MutableFloat accumulatedValue)
{
return accumulatedValue.floatValue();
}
-
+
@Override
public Float getRetraction(Float value)
{
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumInt.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumInt.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumInt.java
index cf0c50e..dca67a4 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumInt.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumInt.java
@@ -33,27 +33,27 @@ public class SumInt implements Accumulation<Integer, MutableInt, Integer>
{
return new MutableInt(0);
}
-
+
@Override
public MutableInt accumulate(MutableInt accumulatedValue, Integer input)
{
accumulatedValue.add(input);
return accumulatedValue;
}
-
+
@Override
public MutableInt merge(MutableInt accumulatedValue1, MutableInt accumulatedValue2)
{
accumulatedValue1.add(accumulatedValue2);
return accumulatedValue1;
}
-
+
@Override
public Integer getOutput(MutableInt accumulatedValue)
{
return accumulatedValue.intValue();
}
-
+
@Override
public Integer getRetraction(Integer value)
{
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumLong.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumLong.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumLong.java
index 55908f5..027e4f8 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumLong.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumLong.java
@@ -33,27 +33,27 @@ public class SumLong implements Accumulation<Long, MutableLong, Long>
{
return new MutableLong(0L);
}
-
+
@Override
public MutableLong accumulate(MutableLong accumulatedValue, Long input)
{
accumulatedValue.add(input);
return accumulatedValue;
}
-
+
@Override
public MutableLong merge(MutableLong accumulatedValue1, MutableLong accumulatedValue2)
{
accumulatedValue1.add(accumulatedValue2);
return accumulatedValue1;
}
-
+
@Override
public Long getOutput(MutableLong accumulatedValue)
{
return accumulatedValue.longValue();
}
-
+
@Override
public Long getRetraction(Long value)
{
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/org/apache/hadoop/io/file/tfile/DTBCFile.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/hadoop/io/file/tfile/DTBCFile.java b/library/src/main/java/org/apache/hadoop/io/file/tfile/DTBCFile.java
index 779b0f0..6db0557 100644
--- a/library/src/main/java/org/apache/hadoop/io/file/tfile/DTBCFile.java
+++ b/library/src/main/java/org/apache/hadoop/io/file/tfile/DTBCFile.java
@@ -47,18 +47,18 @@ import org.apache.hadoop.io.file.tfile.Compression.Algorithm;
/**
- *
+ *
* <ul>
- * <li>The file format of DTFile is same as {@link TFile} with different reader implementation.
+ * <li>The file format of DTFile is same as {@link TFile} with different reader implementation.
* It reads data block by block and cache the binary block data into memory to speed up the random read.
- *
- * <li>The public api of {@link Reader} is as same as it is in {@link TFile} {@link org.apache.hadoop.io.file.tfile.TFile.Reader} implementation.
+ *
+ * <li>The public api of {@link Reader} is as same as it is in {@link TFile} {@link org.apache.hadoop.io.file.tfile.TFile.Reader} implementation.
* Besides, it provides getBlockBuffer(), getKeyOffset(), getKeyLength(), getValueOffset(), getValueLength() method
* to expose raw block, key, value data to user to avoid unnecessary internal/external data copy
- *
+ *
* <li>In the performance test, It shows no difference in sequential reads and 20x faster in random reads(If most of them hit memory)
* </ul>
- *
+ *
* Block Compressed file, the underlying physical storage layer for TFile.
* BCFile provides the basic block level compression for the data block and meta
* blocks. It is separated from TFile as it may be used for other
@@ -102,7 +102,7 @@ final class DTBCFile {
private static interface BlockRegister {
/**
* Register a block that is fully closed.
- *
+ *
* @param raw
* The size of block in terms of uncompressed bytes.
* @param offsetStart
@@ -156,7 +156,7 @@ final class DTBCFile {
/**
* Get the output stream for BlockAppender's consumption.
- *
+ *
* @return the output stream suitable for writing block data.
*/
OutputStream getOutputStream() {
@@ -165,7 +165,7 @@ final class DTBCFile {
/**
* Get the current position in file.
- *
+ *
* @return The current byte offset in underlying file.
* @throws IOException
*/
@@ -179,7 +179,7 @@ final class DTBCFile {
/**
* Current size of compressed data.
- *
+ *
* @return
* @throws IOException
*/
@@ -206,7 +206,7 @@ final class DTBCFile {
/**
* Access point to stuff data into a block.
- *
+ *
* TODO: Change DataOutputStream to something else that tracks the size as
* long instead of int. Currently, we will wrap around if the row block size
* is greater than 4GB.
@@ -219,7 +219,7 @@ final class DTBCFile {
/**
* Constructor
- *
+ *
* @param register
* the block register, which is called when the block is closed.
* @param wbs
@@ -233,7 +233,7 @@ final class DTBCFile {
/**
* Get the raw size of the block.
- *
+ *
* @return the number of uncompressed bytes written through the
* BlockAppender so far.
* @throws IOException
@@ -248,7 +248,7 @@ final class DTBCFile {
/**
* Get the compressed size of the block in progress.
- *
+ *
* @return the number of compressed bytes written to the underlying FS
* file. The size may be smaller than actual need to compress the
* all data written due to internal buffering inside the
@@ -289,7 +289,7 @@ final class DTBCFile {
/**
* Constructor
- *
+ *
* @param fout
* FS output stream.
* @param compressionName
@@ -383,7 +383,7 @@ final class DTBCFile {
* block. There can only be one BlockAppender stream active at any time.
* Regular Blocks may not be created after the first Meta Blocks. The caller
* must call BlockAppender.close() to conclude the block creation.
- *
+ *
* @param name
* The name of the Meta Block. The name must not conflict with
* existing Meta Blocks.
@@ -407,7 +407,7 @@ final class DTBCFile {
* active at any time. Regular Blocks may not be created after the first
* Meta Blocks. The caller must call BlockAppender.close() to conclude the
* block creation.
- *
+ *
* @param name
* The name of the Meta Block. The name must not conflict with
* existing Meta Blocks.
@@ -426,7 +426,7 @@ final class DTBCFile {
* block. There can only be one BlockAppender stream active at any time.
* Data Blocks may not be created after the first Meta Blocks. The caller
* must call BlockAppender.close() to conclude the block creation.
- *
+ *
* @return The BlockAppender stream
* @throws IOException
*/
@@ -474,7 +474,7 @@ final class DTBCFile {
/**
* Callback to make sure a data block is added to the internal list when
* it's being closed.
- *
+ *
*/
private class DataBlockRegister implements BlockRegister {
DataBlockRegister() {
@@ -545,7 +545,7 @@ final class DTBCFile {
/**
* Get the output stream for BlockAppender's consumption.
- *
+ *
* @return the output stream suitable for writing block data.
*/
public ReusableByteArrayInputStream getInputStream() {
@@ -579,7 +579,7 @@ final class DTBCFile {
public static class BlockReader extends DataInputStream {
private final RBlockState rBlkState;
private boolean closed = false;
-
+
private ReusableByteArrayInputStream wrappedInputStream = null;
BlockReader(RBlockState rbs) {
@@ -607,7 +607,7 @@ final class DTBCFile {
/**
* Get the name of the compression algorithm used to compress the block.
- *
+ *
* @return name of the compression algorithm.
*/
public String getCompressionName() {
@@ -616,7 +616,7 @@ final class DTBCFile {
/**
* Get the uncompressed size of the block.
- *
+ *
* @return uncompressed size of the block.
*/
public long getRawSize() {
@@ -625,7 +625,7 @@ final class DTBCFile {
/**
* Get the compressed size of the block.
- *
+ *
* @return compressed size of the block.
*/
public long getCompressedSize() {
@@ -634,7 +634,7 @@ final class DTBCFile {
/**
* Get the starting position of the block in the file.
- *
+ *
* @return the starting position of the block in the file.
*/
public long getStartPos() {
@@ -646,7 +646,7 @@ final class DTBCFile {
closed = false;
rBlkState.renew();
}
-
+
public ReusableByteArrayInputStream getBlockDataInputStream()
{
return wrappedInputStream;
@@ -655,7 +655,7 @@ final class DTBCFile {
/**
* Constructor
- *
+ *
* @param fin
* FS input stream.
* @param fileLength
@@ -696,7 +696,7 @@ final class DTBCFile {
/**
* Get the name of the default compression algorithm.
- *
+ *
* @return the name of the default compression algorithm.
*/
public String getDefaultCompressionName() {
@@ -705,7 +705,7 @@ final class DTBCFile {
/**
* Get version of BCFile file being read.
- *
+ *
* @return version of BCFile file being read.
*/
public Version getBCFileVersion() {
@@ -714,7 +714,7 @@ final class DTBCFile {
/**
* Get version of BCFile API.
- *
+ *
* @return version of BCFile API.
*/
public Version getAPIVersion() {
@@ -733,7 +733,7 @@ final class DTBCFile {
/**
* Get the number of data blocks.
- *
+ *
* @return the number of data blocks.
*/
public int getBlockCount() {
@@ -742,7 +742,7 @@ final class DTBCFile {
/**
* Stream access to a Meta Block.
- *
+ *
* @param name
* meta block name
* @return BlockReader input stream for reading the meta block.
@@ -763,7 +763,7 @@ final class DTBCFile {
/**
* Stream access to a Data Block.
- *
+ *
* @param blockIndex
* 0-based data block index.
* @return BlockReader input stream for reading the data block.
@@ -797,7 +797,7 @@ final class DTBCFile {
/**
* Find the smallest Block index whose starting offset is greater than or
* equal to the specified offset.
- *
+ *
* @param offset
* User-specific offset.
* @return the index to the data Block if such block exists; or -1
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/org/apache/hadoop/io/file/tfile/ReusableByteArrayInputStream.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/hadoop/io/file/tfile/ReusableByteArrayInputStream.java b/library/src/main/java/org/apache/hadoop/io/file/tfile/ReusableByteArrayInputStream.java
index cb559dc..d9c483e 100644
--- a/library/src/main/java/org/apache/hadoop/io/file/tfile/ReusableByteArrayInputStream.java
+++ b/library/src/main/java/org/apache/hadoop/io/file/tfile/ReusableByteArrayInputStream.java
@@ -28,9 +28,9 @@ import java.io.ByteArrayInputStream;
*/
public class ReusableByteArrayInputStream extends ByteArrayInputStream
{
-
+
private final int initialOffset;
-
+
private final int initialLength;
public ReusableByteArrayInputStream(byte[] buf, int offset, int length)
@@ -53,12 +53,12 @@ public class ReusableByteArrayInputStream extends ByteArrayInputStream
count = initialLength;
mark = 0;
}
-
+
public int getPos()
{
return pos;
}
-
+
public byte[] getBuf()
{
return buf;
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/test/java/com/datatorrent/lib/algo/BottomNUnifierTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/algo/BottomNUnifierTest.java b/library/src/test/java/com/datatorrent/lib/algo/BottomNUnifierTest.java
index 7f3061b..46222b1 100644
--- a/library/src/test/java/com/datatorrent/lib/algo/BottomNUnifierTest.java
+++ b/library/src/test/java/com/datatorrent/lib/algo/BottomNUnifierTest.java
@@ -32,13 +32,13 @@ public class BottomNUnifierTest
@Test
public void testUnifier()
{
-
+
// Instantiate unifier
BottomNUnifier<String, Integer> oper = new BottomNUnifier<>();
oper.setN(2);
CollectorTestSink sink = new CollectorTestSink();
oper.mergedport.setSink(sink);
-
+
oper.beginWindow(1);
ArrayList<Integer> values = new ArrayList<Integer>();
values.add(5);
@@ -53,7 +53,7 @@ public class BottomNUnifierTest
tuple.put("a", values);
oper.process(tuple);
oper.endWindow();
-
+
Assert.assertEquals("Tuples in sink", sink.collectedTuples.size(), 1);
tuple = (HashMap<String, ArrayList<Integer>>)sink.collectedTuples.get(0);
values = tuple.get("a");
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/test/java/com/datatorrent/lib/converter/MapToKeyValuePairConverterTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/converter/MapToKeyValuePairConverterTest.java b/library/src/test/java/com/datatorrent/lib/converter/MapToKeyValuePairConverterTest.java
index dd19d2b..0c45542 100644
--- a/library/src/test/java/com/datatorrent/lib/converter/MapToKeyValuePairConverterTest.java
+++ b/library/src/test/java/com/datatorrent/lib/converter/MapToKeyValuePairConverterTest.java
@@ -30,25 +30,25 @@ import com.datatorrent.lib.util.TestUtils;
public class MapToKeyValuePairConverterTest
{
@Test
- public void MapToKeyValuePairConversion()
+ public void MapToKeyValuePairConversion()
{
MapToKeyValuePairConverter<String, Integer> testop = new MapToKeyValuePairConverter<String, Integer>();
Integer[] values = {1, 2, 3};
String[] keys = {"a", "b", "c"};
-
+
HashMap<String, Integer> inputMap = new HashMap<String, Integer>();
for (int i = 0; i < 3; i++) {
inputMap.put(keys[i], values[i]);
}
-
- CollectorTestSink<KeyValPair<String, Integer>> testsink = new CollectorTestSink<KeyValPair<String, Integer>>();
+
+ CollectorTestSink<KeyValPair<String, Integer>> testsink = new CollectorTestSink<KeyValPair<String, Integer>>();
TestUtils.setSink(testop.output, testsink);
-
+
testop.beginWindow(0);
-
+
testop.input.put(inputMap);
-
+
testop.endWindow();
Assert.assertEquals(3,testsink.collectedTuples.size());
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/test/java/com/datatorrent/lib/converter/StringValueToNumberConverterForMapTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/converter/StringValueToNumberConverterForMapTest.java b/library/src/test/java/com/datatorrent/lib/converter/StringValueToNumberConverterForMapTest.java
index 8a5eed2..22e9f72 100644
--- a/library/src/test/java/com/datatorrent/lib/converter/StringValueToNumberConverterForMapTest.java
+++ b/library/src/test/java/com/datatorrent/lib/converter/StringValueToNumberConverterForMapTest.java
@@ -31,25 +31,25 @@ public class StringValueToNumberConverterForMapTest
{
@Test
- public void testStringValueToNumericConversion()
+ public void testStringValueToNumericConversion()
{
StringValueToNumberConverterForMap<String> testop = new StringValueToNumberConverterForMap<String>();
String[] values = {"1.0", "2.0", "3.0"};
String[] keys = {"a", "b", "c"};
-
+
HashMap<String, String> inputMap = new HashMap<String, String>();
for (int i = 0; i < 3; i++) {
inputMap.put(keys[i], values[i]);
}
-
- CollectorTestSink<Map<String, Number>> testsink = new CollectorTestSink<Map<String, Number>>();
+
+ CollectorTestSink<Map<String, Number>> testsink = new CollectorTestSink<Map<String, Number>>();
TestUtils.setSink(testop.output, testsink);
-
+
testop.beginWindow(0);
-
+
testop.input.put(inputMap);
-
+
testop.endWindow();
Assert.assertEquals(1,testsink.collectedTuples.size());
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/test/java/com/datatorrent/lib/db/cache/CacheStoreTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/db/cache/CacheStoreTest.java b/library/src/test/java/com/datatorrent/lib/db/cache/CacheStoreTest.java
index 335418a..61464dd 100644
--- a/library/src/test/java/com/datatorrent/lib/db/cache/CacheStoreTest.java
+++ b/library/src/test/java/com/datatorrent/lib/db/cache/CacheStoreTest.java
@@ -33,7 +33,7 @@ public class CacheStoreTest
public void CacheStoreTest()
{
final Map<Object, Object> backupMap = Maps.newHashMap();
-
+
backupMap.put(1, "one");
backupMap.put(2, "two");
backupMap.put(3, "three");
@@ -44,7 +44,7 @@ public class CacheStoreTest
backupMap.put(8, "eight");
backupMap.put(9, "nine");
backupMap.put(10, "ten");
-
+
CacheStore cs = new CacheStore();
cs.setMaxCacheSize(5);
try {
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcIOAppTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcIOAppTest.java b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcIOAppTest.java
index 908f02f..5c398d2 100644
--- a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcIOAppTest.java
+++ b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcIOAppTest.java
@@ -121,7 +121,7 @@ public class JdbcIOAppTest
lma.prepareDAG(new JdbcIOApp(), conf);
LocalMode.Controller lc = lma.getController();
lc.runAsync();
- // wait for records to be added to table
+ // wait for records to be added to table
Thread.sleep(3000);
lc.shutdown();
Assert.assertEquals("Events in store", 10, getNumOfEventsInStore());
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcOperatorTest.java b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcOperatorTest.java
index 1ffe256..ac17c2f 100644
--- a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcOperatorTest.java
@@ -121,12 +121,12 @@ public class JdbcOperatorTest
{
this.startTimestamp = startTimestamp;
}
-
+
public double getScore()
{
return score;
}
-
+
public void setScore(double score)
{
this.score = score;
@@ -225,7 +225,7 @@ public class JdbcOperatorTest
pStmt.setDouble(6, new Double(55.4));
pStmt.executeUpdate();
}
-
+
} catch (SQLException e) {
throw new RuntimeException(e);
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/test/java/com/datatorrent/lib/filter/FilterTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/filter/FilterTest.java b/library/src/test/java/com/datatorrent/lib/filter/FilterTest.java
index 1fe6484..567e27a 100644
--- a/library/src/test/java/com/datatorrent/lib/filter/FilterTest.java
+++ b/library/src/test/java/com/datatorrent/lib/filter/FilterTest.java
@@ -175,7 +175,7 @@ public class FilterTest
clearFilterOperator();
}
-
+
@Test
public void testOptionalExpressionFunctions()
{
@@ -183,7 +183,7 @@ public class FilterTest
prepareFilterOperator(DummyPublicPOJO.class, "({$}.val == 1)");
Assert.assertEquals(6, filter.getExpressionFunctions().size());
}
-
+
@Test
public void testSetOptionalExpressionFunctionsItem()
{
@@ -191,8 +191,8 @@ public class FilterTest
prepareFilterOperator(DummyPublicPOJO.class, "({$}.val == 1)");
Assert.assertEquals(6, filter.getExpressionFunctions().size());
}
-
-
+
+
@Before
public void setup()
{
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/test/java/com/datatorrent/lib/formatter/XmlFormatterTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/formatter/XmlFormatterTest.java b/library/src/test/java/com/datatorrent/lib/formatter/XmlFormatterTest.java
index bb51ca4..c3e2cde 100644
--- a/library/src/test/java/com/datatorrent/lib/formatter/XmlFormatterTest.java
+++ b/library/src/test/java/com/datatorrent/lib/formatter/XmlFormatterTest.java
@@ -173,7 +173,7 @@ public class XmlFormatterTest
+ "</EmployeeBean>";
Assert.assertEquals(expected, validDataSink.collectedTuples.get(0));
}
-
+
public static class DateAdapter extends XmlAdapter<String, Date>
{
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java
index e1f23d1..2f926d3 100644
--- a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java
@@ -88,7 +88,7 @@ public class AbstractFileInputOperatorTest
@Rule
public TestMeta testMeta = new TestMeta();
-
+
@Test
public void testSinglePartiton() throws Exception
{
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java
index 0fff870..03f3bf6 100644
--- a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java
@@ -1422,7 +1422,7 @@ public class AbstractFileOutputOperatorTest
Assert.assertEquals("Max length validation not thrown with -1 max length", true, error);
}
-
+
@Test
public void testPeriodicRotation()
{
@@ -1439,7 +1439,7 @@ public class AbstractFileOutputOperatorTest
for (int j = 0; j < i; ++j) {
writer.input.put(2 * j + 1);
}
- writer.endWindow();
+ writer.endWindow();
}
writer.committed(29);
Set<String> fileNames = new TreeSet<String>();
@@ -1543,7 +1543,7 @@ public class AbstractFileOutputOperatorTest
// http://bugs.java.com/bugdatabase/view_bug.do?bug_id=4691425
List<Long> evenOffsets = new ArrayList<Long>();
List<Long> oddOffsets = new ArrayList<Long>();
-
+
writer.setFilePath(testMeta.getDir());
writer.setAlwaysWriteToTmp(false);
writer.setup(testMeta.testOperatorContext);
@@ -1633,7 +1633,7 @@ public class AbstractFileOutputOperatorTest
throw new RuntimeException(e);
}
}
-
+
int numWindows = 0;
try {
fis = new FileInputStream(file);
@@ -1651,7 +1651,7 @@ public class AbstractFileOutputOperatorTest
throw new RuntimeException(e);
}
}
-
+
long startOffset = 0;
for (long offset : offsets) {
// Skip initial case in case file is not yet created
@@ -1792,8 +1792,8 @@ public class AbstractFileOutputOperatorTest
{
counterStream = new CounterFilterOutputStream(outputStream);
}
-
- public boolean isDoInit()
+
+ public boolean isDoInit()
{
return (counterStream == null);
}
@@ -1809,7 +1809,7 @@ public class AbstractFileOutputOperatorTest
{
}
-
+
public long getCounter()
{
if (isDoInit()) {
@@ -1817,10 +1817,10 @@ public class AbstractFileOutputOperatorTest
} else {
return counterStream.getCounter();
}
-
+
}
}
-
+
private static class CounterFilterOutputStream extends FilterOutputStream
{
long counter;
@@ -1830,7 +1830,7 @@ public class AbstractFileOutputOperatorTest
{
super(out);
}
-
+
@Override
public void write(int b) throws IOException
{
@@ -1869,5 +1869,5 @@ public class AbstractFileOutputOperatorTest
return counter;
}
}
-
+
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/test/java/com/datatorrent/lib/io/fs/AbstractSingleFileOutputOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractSingleFileOutputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractSingleFileOutputOperatorTest.java
index e5193b6..17febf6 100644
--- a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractSingleFileOutputOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractSingleFileOutputOperatorTest.java
@@ -173,7 +173,7 @@ public class AbstractSingleFileOutputOperatorTest
{
writer.setOutputFileName(SINGLE_FILE);
writer.setPartitionedFileNameformat("");
-
+
File meta = new File(testMeta.getDir());
writer.setFilePath(meta.getAbsolutePath());
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/test/java/com/datatorrent/lib/io/fs/FastMergerDecisionMakerTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/FastMergerDecisionMakerTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/FastMergerDecisionMakerTest.java
index e1f57d9..e0ca9b6 100644
--- a/library/src/test/java/com/datatorrent/lib/io/fs/FastMergerDecisionMakerTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/fs/FastMergerDecisionMakerTest.java
@@ -74,7 +74,7 @@ public class FastMergerDecisionMakerTest
/**
* If some block is missing then expect BlockNotFoundException.
- *
+ *
* @throws IOException
* @throws BlockNotFoundException
*/
@@ -111,7 +111,7 @@ public class FastMergerDecisionMakerTest
/**
* All blocks are of same size which is same as default blockSize. Then fast
* merge is possible
- *
+ *
* @throws IOException
* @throws BlockNotFoundException
*/
@@ -126,7 +126,7 @@ public class FastMergerDecisionMakerTest
* All blocks (except last block)are of same size which is same as default
* blockSize. Last block is smaller than default blockSize Then fast merge is
* possible
- *
+ *
* @throws IOException
* @throws BlockNotFoundException
*/
@@ -141,7 +141,7 @@ public class FastMergerDecisionMakerTest
/**
* Some block other than last block is of different size. Then fast merge is
* not possible
- *
+ *
* @throws IOException
* @throws BlockNotFoundException
*/
@@ -156,7 +156,7 @@ public class FastMergerDecisionMakerTest
/**
* Some block other than last block is of different size. Then fast merge is
* not possible
- *
+ *
* @throws IOException
* @throws BlockNotFoundException
*/
@@ -171,7 +171,7 @@ public class FastMergerDecisionMakerTest
/**
* Some block other than last block is of different size. Then fast merge is
* not possible
- *
+ *
* @throws IOException
* @throws BlockNotFoundException
*/