You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/05/18 20:42:07 UTC
[17/22] incubator-apex-malhar git commit: APEXMALHAR-2095 removed
checkstyle violations of malhar library module
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/appdata/schemas/FieldsDescriptor.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/appdata/schemas/FieldsDescriptor.java b/library/src/main/java/com/datatorrent/lib/appdata/schemas/FieldsDescriptor.java
index 0bc4712..ec00fda 100644
--- a/library/src/main/java/com/datatorrent/lib/appdata/schemas/FieldsDescriptor.java
+++ b/library/src/main/java/com/datatorrent/lib/appdata/schemas/FieldsDescriptor.java
@@ -18,9 +18,7 @@
*/
package com.datatorrent.lib.appdata.schemas;
-import it.unimi.dsi.fastutil.objects.Object2IntLinkedOpenHashMap;
import java.io.Serializable;
-
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
@@ -28,17 +26,19 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import com.datatorrent.lib.appdata.gpo.Serde;
import com.datatorrent.lib.appdata.gpo.SerdeObjectPayloadFix;
+import it.unimi.dsi.fastutil.objects.Object2IntLinkedOpenHashMap;
+
/**
* This class manages the storage of fields in app data. It is used in {@link GPOMutable} objects
* to map field names to values in order to respond to queries, it also serves as a schema which is
@@ -118,8 +118,7 @@ public class FieldsDescriptor implements Serializable
* @param fieldToType A mapping from field names to the type of the field.
* @param fieldToSerdeObject A mapping from field names to the corresponding serde object.
*/
- public FieldsDescriptor(Map<String, Type> fieldToType,
- Map<String, Serde> fieldToSerdeObject)
+ public FieldsDescriptor(Map<String, Type> fieldToType, Map<String, Serde> fieldToSerdeObject)
{
setFieldToType(fieldToType);
compressedTypes = Sets.newHashSet();
@@ -128,23 +127,17 @@ public class FieldsDescriptor implements Serializable
List<String> fieldNames = typeToFields.get(Type.OBJECT);
- if(fieldNames == null) {
- throw new IllegalArgumentException("There are no fields of type " + Type.OBJECT +
- " in this fieldsdescriptor");
- }
- else {
+ if (fieldNames == null) {
+ throw new IllegalArgumentException("There are no fields of type " + Type.OBJECT + " in this fieldsdescriptor");
+ } else {
serdes = new Serde[fieldNames.size()];
//Insert serdes in corresponding order
- for(int index = 0;
- index < fieldNames.size();
- index++) {
+ for (int index = 0; index < fieldNames.size(); index++) {
String fieldName = fieldNames.get(index);
Serde serdeObject = fieldToSerdeObject.get(fieldName);
- if(serdeObject == null) {
- throw new IllegalArgumentException("The field "
- + fieldName
- + " doesn't have a serde object.");
+ if (serdeObject == null) {
+ throw new IllegalArgumentException("The field " + fieldName + " doesn't have a serde object.");
}
serdes[index] = serdeObject;
@@ -152,9 +145,8 @@ public class FieldsDescriptor implements Serializable
}
}
- public FieldsDescriptor(Map<String, Type> fieldToType,
- Map<String, Serde> fieldToSerdeObject,
- SerdeObjectPayloadFix serdePayloadFix)
+ public FieldsDescriptor(Map<String, Type> fieldToType, Map<String, Serde> fieldToSerdeObject,
+ SerdeObjectPayloadFix serdePayloadFix)
{
this(fieldToType, fieldToSerdeObject);
this.serdePayloadFix = serdePayloadFix;
@@ -185,13 +177,13 @@ public class FieldsDescriptor implements Serializable
typeToFieldToIndex = Maps.newHashMap();
typeToFields = Maps.newHashMap();
- for(Map.Entry<String, Type> entry: fieldToType.entrySet()) {
+ for (Map.Entry<String, Type> entry : fieldToType.entrySet()) {
String field = entry.getKey();
Type type = entry.getValue();
List<String> fieldsList = typeToFields.get(type);
- if(fieldsList == null) {
+ if (fieldsList == null) {
fieldsList = Lists.newArrayList();
typeToFields.put(type, fieldsList);
}
@@ -200,22 +192,19 @@ public class FieldsDescriptor implements Serializable
}
//ensure consistent ordering of fields
- for(Map.Entry<Type, List<String>> entry: typeToFields.entrySet()) {
+ for (Map.Entry<Type, List<String>> entry : typeToFields.entrySet()) {
Type type = entry.getKey();
List<String> tempFields = entry.getValue();
Collections.sort(tempFields);
Object2IntLinkedOpenHashMap<String> fieldToIndex = new Object2IntLinkedOpenHashMap<String>();
- for(int index = 0;
- index < tempFields.size();
- index++) {
+ for (int index = 0; index < tempFields.size(); index++) {
String field = tempFields.get(index);
- if(compressedTypes.contains(type)) {
+ if (compressedTypes.contains(type)) {
fieldToIndex.put(field, 0);
- }
- else {
+ } else {
fieldToIndex.put(field, index);
}
}
@@ -225,10 +214,9 @@ public class FieldsDescriptor implements Serializable
//Types
- if(!typeToFields.isEmpty()) {
+ if (!typeToFields.isEmpty()) {
types = EnumSet.copyOf(typeToFields.keySet());
- }
- else {
+ } else {
types = Sets.newHashSet();
}
@@ -245,13 +233,12 @@ public class FieldsDescriptor implements Serializable
//Array Sizes
typeToSize = new Object2IntLinkedOpenHashMap<Type>();
- for(Map.Entry<Type, List<String>> entry: typeToFields.entrySet()) {
+ for (Map.Entry<Type, List<String>> entry : typeToFields.entrySet()) {
Type type = entry.getKey();
- if(compressedTypes.contains(type)) {
+ if (compressedTypes.contains(type)) {
getTypeToSize().put(type, 1);
- }
- else {
+ } else {
getTypeToSize().put(type, entry.getValue().size());
}
}
@@ -272,8 +259,9 @@ public class FieldsDescriptor implements Serializable
* @param fieldToType The field to type map to set for this {@link FieldsDescriptor}
* object.
*/
- private void setFieldToType(Map<String, Type> fieldToType) {
- for(Map.Entry<String, Type> entry: fieldToType.entrySet()) {
+ private void setFieldToType(Map<String, Type> fieldToType)
+ {
+ for (Map.Entry<String, Type> entry : fieldToType.entrySet()) {
Preconditions.checkNotNull(entry.getKey());
Preconditions.checkNotNull(entry.getValue());
}
@@ -287,7 +275,7 @@ public class FieldsDescriptor implements Serializable
*/
private void setCompressedTypes(Set<Type> compressedTypes)
{
- for(Type type: compressedTypes) {
+ for (Type type : compressedTypes) {
Preconditions.checkNotNull(type);
}
@@ -319,7 +307,7 @@ public class FieldsDescriptor implements Serializable
*/
public Fields getFields()
{
- if(fields == null) {
+ if (fields == null) {
fields = new Fields(fieldToType.keySet());
}
@@ -336,7 +324,7 @@ public class FieldsDescriptor implements Serializable
{
Map<String, Type> newFieldToType = Maps.newHashMap();
- for(String field: fields.getFields()) {
+ for (String field : fields.getFields()) {
Type type = fieldToType.get(field);
newFieldToType.put(field, type);
}
@@ -426,17 +414,19 @@ public class FieldsDescriptor implements Serializable
@Override
public boolean equals(Object obj)
{
- if(obj == null) {
+ if (obj == null) {
return false;
}
- if(getClass() != obj.getClass()) {
+ if (getClass() != obj.getClass()) {
return false;
}
final FieldsDescriptor other = (FieldsDescriptor)obj;
- if(this.fieldToType != other.fieldToType && (this.fieldToType == null || !this.fieldToType.equals(other.fieldToType))) {
+ if (this.fieldToType != other.fieldToType && (this.fieldToType == null || !this.fieldToType.equals(
+ other.fieldToType))) {
return false;
}
- if(this.compressedTypes != other.compressedTypes && (this.compressedTypes == null || !this.compressedTypes.equals(other.compressedTypes))) {
+ if (this.compressedTypes != other.compressedTypes && (this.compressedTypes == null || !this.compressedTypes.equals(
+ other.compressedTypes))) {
return false;
}
return true;
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/appdata/schemas/QRBase.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/appdata/schemas/QRBase.java b/library/src/main/java/com/datatorrent/lib/appdata/schemas/QRBase.java
index 6d97e9a..cbeaf2e 100644
--- a/library/src/main/java/com/datatorrent/lib/appdata/schemas/QRBase.java
+++ b/library/src/main/java/com/datatorrent/lib/appdata/schemas/QRBase.java
@@ -80,8 +80,7 @@ public abstract class QRBase extends Message
* @param id The query id.
* @param type The type of the query.
*/
- public QRBase(String id,
- String type)
+ public QRBase(String id, String type)
{
super(type);
this.id = Preconditions.checkNotNull(id);
@@ -93,9 +92,7 @@ public abstract class QRBase extends Message
* @param type The type of the query.
* @param countdown The countdown for the query.
*/
- public QRBase(String id,
- String type,
- long countdown)
+ public QRBase(String id, String type, long countdown)
{
this(id, type);
setCountdown(countdown);
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/appdata/schemas/Query.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/appdata/schemas/Query.java b/library/src/main/java/com/datatorrent/lib/appdata/schemas/Query.java
index 835c599..fc88886 100644
--- a/library/src/main/java/com/datatorrent/lib/appdata/schemas/Query.java
+++ b/library/src/main/java/com/datatorrent/lib/appdata/schemas/Query.java
@@ -62,20 +62,18 @@ public abstract class Query extends QRBase
* @param id The query id.
* @param type The type of the query.
*/
- public Query(String id,
- String type)
+ public Query(String id, String type)
{
super(id, type);
}
+
/**
* Creates a query with the given id, type, and schemaKeys.
* @param id The query id.
* @param type The type of the query.
* @param schemaKeys The schemaKeys for the query.
*/
- public Query(String id,
- String type,
- Map<String, String> schemaKeys)
+ public Query(String id, String type, Map<String, String> schemaKeys)
{
super(id, type);
setSchemaKeys(schemaKeys);
@@ -87,9 +85,7 @@ public abstract class Query extends QRBase
* @param type The type of the query.
* @param countdown The countdown for the query.
*/
- public Query(String id,
- String type,
- long countdown)
+ public Query(String id, String type, long countdown)
{
super(id, type, countdown);
}
@@ -101,10 +97,7 @@ public abstract class Query extends QRBase
* @param countdown The countdown for the query.
* @param schemaKeys The schemaKeys for the query.
*/
- public Query(String id,
- String type,
- long countdown,
- Map<String, String> schemaKeys)
+ public Query(String id, String type, long countdown, Map<String, String> schemaKeys)
{
super(id, type, countdown);
setSchemaKeys(schemaKeys);
@@ -116,11 +109,11 @@ public abstract class Query extends QRBase
*/
private void setSchemaKeys(Map<String, String> schemaKeys)
{
- if(schemaKeys == null) {
+ if (schemaKeys == null) {
return;
}
- for(Map.Entry<String, String> entry: schemaKeys.entrySet()) {
+ for (Map.Entry<String, String> entry : schemaKeys.entrySet()) {
Preconditions.checkNotNull(entry.getKey());
Preconditions.checkNotNull(entry.getValue());
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/appdata/schemas/Result.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/appdata/schemas/Result.java b/library/src/main/java/com/datatorrent/lib/appdata/schemas/Result.java
index 0841e13..ea08ce0 100644
--- a/library/src/main/java/com/datatorrent/lib/appdata/schemas/Result.java
+++ b/library/src/main/java/com/datatorrent/lib/appdata/schemas/Result.java
@@ -53,8 +53,7 @@ public abstract class Result extends QRBase
* @param query The query that this result is a response to.
* @param countdown The countdown for this result.
*/
- public Result(Query query,
- long countdown)
+ public Result(Query query, long countdown)
{
super(query.getId());
setQuery(query);
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/appdata/schemas/ResultFormatter.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/appdata/schemas/ResultFormatter.java b/library/src/main/java/com/datatorrent/lib/appdata/schemas/ResultFormatter.java
index 44a9c8c..1cd6712 100644
--- a/library/src/main/java/com/datatorrent/lib/appdata/schemas/ResultFormatter.java
+++ b/library/src/main/java/com/datatorrent/lib/appdata/schemas/ResultFormatter.java
@@ -114,34 +114,28 @@ public class ResultFormatter implements Serializable
{
Type type = Type.CLASS_TO_TYPE.get(object.getClass());
- if(type == null) {
+ if (type == null) {
return object.toString();
}
- switch(type) {
- case FLOAT:
- {
- return format((float) ((Float) object));
+ switch (type) {
+ case FLOAT: {
+ return format((float)((Float)object));
}
- case DOUBLE:
- {
- return format((double) ((Double) object));
+ case DOUBLE: {
+ return format((double)((Double)object));
}
- case BYTE:
- {
- return format((byte) ((Byte) object));
+ case BYTE: {
+ return format((byte)((Byte)object));
}
- case SHORT:
- {
- return format((short) ((Short) object));
+ case SHORT: {
+ return format((short)((Short)object));
}
- case INTEGER:
- {
- return format((int) ((Integer) object));
+ case INTEGER: {
+ return format((int)((Integer)object));
}
- case LONG:
- {
- return format((long) ((Long) object));
+ case LONG: {
+ return format((long)((Long)object));
}
default:
return object.toString();
@@ -157,7 +151,7 @@ public class ResultFormatter implements Serializable
{
DecimalFormat df = getFloatFormat();
- if(df != null) {
+ if (df != null) {
return df.format(val);
}
@@ -173,7 +167,7 @@ public class ResultFormatter implements Serializable
{
DecimalFormat df = getDoubleFormat();
- if(df != null) {
+ if (df != null) {
return df.format(val);
}
@@ -189,7 +183,7 @@ public class ResultFormatter implements Serializable
{
DecimalFormat df = getByteFormat();
- if(df != null) {
+ if (df != null) {
return df.format(val);
}
@@ -205,7 +199,7 @@ public class ResultFormatter implements Serializable
{
DecimalFormat df = getShortFormat();
- if(df != null) {
+ if (df != null) {
return df.format(val);
}
@@ -221,7 +215,7 @@ public class ResultFormatter implements Serializable
{
DecimalFormat df = getIntFormat();
- if(df != null) {
+ if (df != null) {
return df.format(val);
}
@@ -237,7 +231,7 @@ public class ResultFormatter implements Serializable
{
DecimalFormat df = getLongFormat();
- if(df != null) {
+ if (df != null) {
return df.format(val);
}
@@ -250,7 +244,7 @@ public class ResultFormatter implements Serializable
*/
public DecimalFormat getFloatFormat()
{
- if(floatFormat == null && floatFormatString != null) {
+ if (floatFormat == null && floatFormatString != null) {
floatFormat = new DecimalFormat(floatFormatString);
}
@@ -263,7 +257,7 @@ public class ResultFormatter implements Serializable
*/
public DecimalFormat getDoubleFormat()
{
- if(doubleFormat == null && doubleFormatString != null) {
+ if (doubleFormat == null && doubleFormatString != null) {
doubleFormat = new DecimalFormat(doubleFormatString);
}
@@ -276,7 +270,7 @@ public class ResultFormatter implements Serializable
*/
public DecimalFormat getByteFormat()
{
- if(byteFormat == null && byteFormatString != null) {
+ if (byteFormat == null && byteFormatString != null) {
byteFormat = new DecimalFormat(byteFormatString);
}
@@ -289,7 +283,7 @@ public class ResultFormatter implements Serializable
*/
public DecimalFormat getShortFormat()
{
- if(shortFormat == null && shortFormatString != null) {
+ if (shortFormat == null && shortFormatString != null) {
shortFormat = new DecimalFormat(shortFormatString);
}
@@ -302,7 +296,7 @@ public class ResultFormatter implements Serializable
*/
public DecimalFormat getIntFormat()
{
- if(intFormat == null && intFormatString != null) {
+ if (intFormat == null && intFormatString != null) {
intFormat = new DecimalFormat(intFormatString);
}
@@ -315,7 +309,7 @@ public class ResultFormatter implements Serializable
*/
public DecimalFormat getLongFormat()
{
- if(longFormat == null && longFormatString != null) {
+ if (longFormat == null && longFormatString != null) {
longFormat = new DecimalFormat(longFormatString);
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/appdata/schemas/Schema.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/appdata/schemas/Schema.java b/library/src/main/java/com/datatorrent/lib/appdata/schemas/Schema.java
index e2e21da..8260c81 100644
--- a/library/src/main/java/com/datatorrent/lib/appdata/schemas/Schema.java
+++ b/library/src/main/java/com/datatorrent/lib/appdata/schemas/Schema.java
@@ -41,26 +41,31 @@ public interface Schema
* @return The id of the schema.
*/
public int getSchemaID();
+
/**
* Gets the type of the schema (e.x. point, dimensions).
* @return The type of the schema.
*/
public String getSchemaType();
+
/**
* Gets the version of the schema.
* @return The version of the schema.
*/
public String getSchemaVersion();
+
/**
* Gets the AppData json to serve in response to a schema query.
* @return The AppData json to serve in response to a schema query.
*/
public String getSchemaJSON();
+
/**
* Gets the schema keys which are used to send queries targeted to this schema.
* @return The schema keys which are used to send queries targeted to this schema.
*/
public Map<String, String> getSchemaKeys();
+
/**
* Sets the schema keys for this schema.
* @param schemaKeys The schema keys for this schema.
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/appdata/schemas/SchemaQuery.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/appdata/schemas/SchemaQuery.java b/library/src/main/java/com/datatorrent/lib/appdata/schemas/SchemaQuery.java
index 8b9cfe0..a3fd69d 100644
--- a/library/src/main/java/com/datatorrent/lib/appdata/schemas/SchemaQuery.java
+++ b/library/src/main/java/com/datatorrent/lib/appdata/schemas/SchemaQuery.java
@@ -33,9 +33,9 @@ import com.datatorrent.lib.appdata.query.serde.SimpleDataValidator;
* This class represents a schema query.
* @since 3.0.0
*/
-@MessageType(type=SchemaQuery.TYPE)
-@MessageDeserializerInfo(clazz=SchemaQueryDeserializer.class)
-@MessageValidatorInfo(clazz=SimpleDataValidator.class)
+@MessageType(type = SchemaQuery.TYPE)
+@MessageDeserializerInfo(clazz = SchemaQueryDeserializer.class)
+@MessageValidatorInfo(clazz = SimpleDataValidator.class)
public class SchemaQuery extends Query
{
public static final String FIELD_CONTEXT = "context";
@@ -70,15 +70,12 @@ public class SchemaQuery extends Query
* @param id The id of the query.
* @param schemaKeys The schema keys for the requested schema.
*/
- public SchemaQuery(String id,
- Map<String, String> schemaKeys)
+ public SchemaQuery(String id, Map<String, String> schemaKeys)
{
super(id, TYPE, schemaKeys);
}
- public SchemaQuery(String id,
- Map<String, String> schemaKeys,
- Map<String, String> contextKeys)
+ public SchemaQuery(String id, Map<String, String> schemaKeys, Map<String, String> contextKeys)
{
super(id, TYPE);
this.schemaKeys = schemaKeys;
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/appdata/schemas/SchemaRegistry.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/appdata/schemas/SchemaRegistry.java b/library/src/main/java/com/datatorrent/lib/appdata/schemas/SchemaRegistry.java
index 93e8bf2..374e16c 100644
--- a/library/src/main/java/com/datatorrent/lib/appdata/schemas/SchemaRegistry.java
+++ b/library/src/main/java/com/datatorrent/lib/appdata/schemas/SchemaRegistry.java
@@ -34,17 +34,20 @@ public interface SchemaRegistry
* @return The schema result.
*/
public SchemaResult getSchemaResult(SchemaQuery schemaQuery);
+
/**
* Registers the given schema with this schema registry.
* @param schema The schema to register with this registry.
*/
public void registerSchema(Schema schema);
+
/**
* Registers the given schema with the given schema keys.
* @param schema The schema to register.
* @param schemaKeys The schema keys that correspond with the given schema.
*/
public void registerSchema(Schema schema, Map<String, String> schemaKeys);
+
/**
* Gets the schema corresponding to the given schema keys.
* @param schemaKeys The schema keys for a schema.
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/appdata/schemas/SchemaRegistryMultiple.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/appdata/schemas/SchemaRegistryMultiple.java b/library/src/main/java/com/datatorrent/lib/appdata/schemas/SchemaRegistryMultiple.java
index ee63af8..eaabcbf 100644
--- a/library/src/main/java/com/datatorrent/lib/appdata/schemas/SchemaRegistryMultiple.java
+++ b/library/src/main/java/com/datatorrent/lib/appdata/schemas/SchemaRegistryMultiple.java
@@ -18,14 +18,14 @@
*/
package com.datatorrent.lib.appdata.schemas;
-import com.google.common.base.Preconditions;
import java.io.Serializable;
-
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
+import com.google.common.base.Preconditions;
+
import com.datatorrent.lib.appdata.datastructs.DimensionalTable;
/**
@@ -66,8 +66,7 @@ public class SchemaRegistryMultiple implements SchemaRegistry, Serializable
* @param schemaComparator The comparator used to order the schemas returned in the {@link SchemaResult} produced
* by {@link SchemaRegistryMultiple#getSchemaResult(com.datatorrent.lib.appdata.schemas.SchemaQuery)}
*/
- public SchemaRegistryMultiple(List<String> schemaKeys,
- Comparator<Schema> schemaComparator)
+ public SchemaRegistryMultiple(List<String> schemaKeys, Comparator<Schema> schemaComparator)
{
this(schemaKeys);
this.schemaComparator = Preconditions.checkNotNull(schemaComparator);
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/appdata/schemas/SchemaRegistrySingle.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/appdata/schemas/SchemaRegistrySingle.java b/library/src/main/java/com/datatorrent/lib/appdata/schemas/SchemaRegistrySingle.java
index 8770fb2..12c5434 100644
--- a/library/src/main/java/com/datatorrent/lib/appdata/schemas/SchemaRegistrySingle.java
+++ b/library/src/main/java/com/datatorrent/lib/appdata/schemas/SchemaRegistrySingle.java
@@ -19,14 +19,13 @@
package com.datatorrent.lib.appdata.schemas;
import java.io.Serializable;
-
import java.util.Map;
-import com.google.common.base.Preconditions;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Preconditions;
+
/**
* This schema registry holds a single schema. It is intended to be used in operators
* which serve a single schema.
@@ -61,10 +60,8 @@ public class SchemaRegistrySingle implements SchemaRegistry, Serializable
private void setSchema(Schema schema)
{
Preconditions.checkNotNull(schema);
- Preconditions.checkArgument(schema.getSchemaKeys() == null,
- "The provided schema should not have schema keys "
- + schema.getSchemaKeys()
- + " since they will never be used.");
+ Preconditions.checkArgument(schema.getSchemaKeys() == null, "The provided schema should not have schema keys "
+ + schema.getSchemaKeys() + " since they will never be used.");
this.schema = schema;
}
@@ -74,8 +71,9 @@ public class SchemaRegistrySingle implements SchemaRegistry, Serializable
{
Preconditions.checkNotNull(schemaQuery, "This should never happen.");
- if(schemaQuery.getSchemaKeys() != null) {
- logger.error("Schema keys in the given query don't apply for single schema registry: schemaKeys={}", schemaQuery.getSchemaKeys());
+ if (schemaQuery.getSchemaKeys() != null) {
+ logger.error("Schema keys in the given query don't apply for single schema registry: schemaKeys={}",
+ schemaQuery.getSchemaKeys());
return null;
}
@@ -104,6 +102,6 @@ public class SchemaRegistrySingle implements SchemaRegistry, Serializable
@Override
public int size()
{
- return schema == null ? 0: 1;
+ return schema == null ? 0 : 1;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/appdata/schemas/SchemaResult.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/appdata/schemas/SchemaResult.java b/library/src/main/java/com/datatorrent/lib/appdata/schemas/SchemaResult.java
index 616dce9..b7cc6bc 100644
--- a/library/src/main/java/com/datatorrent/lib/appdata/schemas/SchemaResult.java
+++ b/library/src/main/java/com/datatorrent/lib/appdata/schemas/SchemaResult.java
@@ -30,8 +30,8 @@ import com.datatorrent.lib.appdata.query.serde.MessageType;
* as a result to a {@link SchemaQuery}.
* @since 3.0.0
*/
-@MessageType(type=SchemaResult.TYPE)
-@MessageSerializerInfo(clazz=SchemaResultSerializer.class)
+@MessageType(type = SchemaResult.TYPE)
+@MessageSerializerInfo(clazz = SchemaResultSerializer.class)
public class SchemaResult extends Result
{
/**
@@ -50,8 +50,7 @@ public class SchemaResult extends Result
* @param schemaQuery
* @param genericSchemas
*/
- public SchemaResult(SchemaQuery schemaQuery,
- Schema... genericSchemas)
+ public SchemaResult(SchemaQuery schemaQuery, Schema... genericSchemas)
{
super(schemaQuery);
setGenericSchemas(genericSchemas);
@@ -63,8 +62,7 @@ public class SchemaResult extends Result
* @param schemaQuery The schema query which this schema result will be a response to.
* @param genericSchemas The schemas to return in the schema result payload.
*/
- public SchemaResult(SchemaQuery schemaQuery,
- List<Schema> genericSchemas)
+ public SchemaResult(SchemaQuery schemaQuery, List<Schema> genericSchemas)
{
super(schemaQuery);
setGenericSchemas(genericSchemas);
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/appdata/schemas/SchemaResultSerializer.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/appdata/schemas/SchemaResultSerializer.java b/library/src/main/java/com/datatorrent/lib/appdata/schemas/SchemaResultSerializer.java
index 133affd..d9fd92c 100644
--- a/library/src/main/java/com/datatorrent/lib/appdata/schemas/SchemaResultSerializer.java
+++ b/library/src/main/java/com/datatorrent/lib/appdata/schemas/SchemaResultSerializer.java
@@ -43,14 +43,11 @@ public class SchemaResultSerializer implements CustomMessageSerializer
@Override
public String serialize(Message message, ResultFormatter resultFormatter)
{
- if(!(message instanceof SchemaResult))
- {
- throw new IllegalArgumentException("Must receive a "
- + SchemaResult.class
- + " object.");
+ if (!(message instanceof SchemaResult)) {
+ throw new IllegalArgumentException("Must receive a " + SchemaResult.class + " object.");
}
- SchemaResult genericSchemaResult = (SchemaResult) message;
+ SchemaResult genericSchemaResult = (SchemaResult)message;
StringBuilder sb = new StringBuilder();
@@ -58,20 +55,16 @@ public class SchemaResultSerializer implements CustomMessageSerializer
logger.debug("result id {}", genericSchemaResult.getId());
logger.debug("result type {}", genericSchemaResult.getType());
- sb.append("{\"").append(Result.FIELD_ID).
- append("\":\"").append(genericSchemaResult.getId()).
- append("\",\"").append(Result.FIELD_TYPE).
- append("\":\"").append(genericSchemaResult.getType()).
- append("\",\"").append(Result.FIELD_DATA).
- append("\":");
+ sb.append("{\"").append(Result.FIELD_ID).append("\":\"").append(genericSchemaResult.getId())
+ .append("\",\"").append(Result.FIELD_TYPE).append("\":\"").append(genericSchemaResult.getType())
+ .append("\",\"").append(Result.FIELD_DATA).append("\":");
JSONArray schemaArray = new JSONArray();
- for(Schema schema: genericSchemaResult.getGenericSchemas()) {
+ for (Schema schema : genericSchemaResult.getGenericSchemas()) {
try {
schemaArray.put(new JSONObject(schema.getSchemaJSON()));
- }
- catch(JSONException ex) {
+ } catch (JSONException ex) {
throw new RuntimeException(ex);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/appdata/schemas/SchemaUtils.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/appdata/schemas/SchemaUtils.java b/library/src/main/java/com/datatorrent/lib/appdata/schemas/SchemaUtils.java
index dcfbc8d..b628662 100644
--- a/library/src/main/java/com/datatorrent/lib/appdata/schemas/SchemaUtils.java
+++ b/library/src/main/java/com/datatorrent/lib/appdata/schemas/SchemaUtils.java
@@ -69,10 +69,8 @@ public class SchemaUtils
InputStream is = SchemaUtils.class.getClassLoader().getResourceAsStream(resource);
Preconditions.checkArgument(is != null, resource + " could not be found in the resources.");
- IOUtils.copy(is,
- stringWriter);
- }
- catch(IOException ex) {
+ IOUtils.copy(is, stringWriter);
+ } catch (IOException ex) {
throw new RuntimeException(ex);
}
return stringWriter.toString();
@@ -89,8 +87,7 @@ public class SchemaUtils
try {
IOUtils.copy(inputStream, stringWriter);
- }
- catch(IOException ex) {
+ } catch (IOException ex) {
throw new RuntimeException(ex);
}
@@ -103,8 +100,7 @@ public class SchemaUtils
* @param fields The keys in the {@link JSONObject} to check.
* @return True if the given {@link JSONObject} contains all the given keys. False otherwise.
*/
- public static boolean checkValidKeys(JSONObject jo,
- Fields fields)
+ public static boolean checkValidKeys(JSONObject jo, Fields fields)
{
@SuppressWarnings("unchecked")
Set<String> fieldSet = fields.getFields();
@@ -119,8 +115,7 @@ public class SchemaUtils
* @param jo The {@link JSONObject} to check.
* @param fields The keys in the {@link JSONObject} to check.
*/
- public static void checkValidKeysEx(JSONObject jo,
- Fields fields)
+ public static void checkValidKeysEx(JSONObject jo, Fields fields)
{
@SuppressWarnings("unchecked")
Set<String> fieldSet = fields.getFields();
@@ -128,10 +123,8 @@ public class SchemaUtils
if (!jsonKeys.containsAll(fieldSet)) {
- throw new IllegalArgumentException("The given set of keys "
- + fieldSet
- + " doesn't equal the set of keys in the json "
- + jsonKeys);
+ throw new IllegalArgumentException("The given set of keys " + fieldSet
+ + " doesn't equal the set of keys in the json " + jsonKeys);
}
}
@@ -141,15 +134,12 @@ public class SchemaUtils
* @param fieldsCollection The keys in the {@link JSONObject} to check.
* @return True if the given {@link JSONObject} contains all the given keys. False otherwise.
*/
- public static boolean checkValidKeys(JSONObject jo,
- Collection<Fields> fieldsCollection)
+ public static boolean checkValidKeys(JSONObject jo, Collection<Fields> fieldsCollection)
{
- return checkValidKeysHelper(jo,
- fieldsCollection);
+ return checkValidKeysHelper(jo, fieldsCollection);
}
- private static boolean checkValidKeysHelper(JSONObject jo,
- Collection<Fields> fieldsCollection)
+ private static boolean checkValidKeysHelper(JSONObject jo, Collection<Fields> fieldsCollection)
{
for (Fields fields: fieldsCollection) {
LOG.debug("Checking keys: {}", fields);
@@ -158,17 +148,14 @@ public class SchemaUtils
}
}
- LOG.error("The first level of keys in the provided JSON {} do not match any of the " +
- "valid keysets {}",
- getSetOfJSONKeys(jo),
- fieldsCollection);
+ LOG.error("The first level of keys in the provided JSON {} do not match any of the " + "valid keysets {}",
+ getSetOfJSONKeys(jo), fieldsCollection);
return false;
}
public static boolean checkValidKeys(JSONObject jo, List<Fields> fieldsCollection)
{
- return checkValidKeysHelper(jo,
- fieldsCollection);
+ return checkValidKeysHelper(jo, fieldsCollection);
}
/**
@@ -178,15 +165,12 @@ public class SchemaUtils
* @param fieldsCollection The keys in the {@link JSONObject} to check.
* @return True if the given {@link JSONObject} contains all the given keys. False otherwise.
*/
- public static boolean checkValidKeysEx(JSONObject jo,
- Collection<Fields> fieldsCollection)
+ public static boolean checkValidKeysEx(JSONObject jo, Collection<Fields> fieldsCollection)
{
- return checkValidKeysExHelper(jo,
- fieldsCollection);
+ return checkValidKeysExHelper(jo, fieldsCollection);
}
- public static boolean checkValidKeysExHelper(JSONObject jo,
- Collection<Fields> fieldsCollection)
+ public static boolean checkValidKeysExHelper(JSONObject jo, Collection<Fields> fieldsCollection)
{
for (Fields fields: fieldsCollection) {
if (checkValidKeys(jo, fields)) {
@@ -196,10 +180,8 @@ public class SchemaUtils
Set<String> keys = getSetOfJSONKeys(jo);
- throw new IllegalArgumentException("The given json object has an invalid set of keys: " +
- keys +
- "\nOne of the following key combinations was expected:\n" +
- fieldsCollection);
+ throw new IllegalArgumentException("The given json object has an invalid set of keys: " + keys
+ + "\nOne of the following key combinations was expected:\n" + fieldsCollection);
}
public static boolean checkValidKeysEx(JSONObject jo, List<Fields> fieldsCollection)
@@ -225,7 +207,7 @@ public class SchemaUtils
{
Map<String, String> fieldToTypeString = Maps.newHashMap();
- for(Map.Entry<String, Type> entry: fieldToType.entrySet()) {
+ for (Map.Entry<String, Type> entry : fieldToType.entrySet()) {
String field = entry.getKey();
String typeString = entry.getValue().name();
@@ -237,12 +219,11 @@ public class SchemaUtils
public static JSONArray findFirstKeyJSONArray(JSONObject jo, String key)
{
- if(jo.has(key)) {
+ if (jo.has(key)) {
try {
JSONArray jsonArray = jo.getJSONArray(key);
return jsonArray;
- }
- catch(JSONException ex) {
+ } catch (JSONException ex) {
throw new RuntimeException(ex);
}
}
@@ -250,22 +231,21 @@ public class SchemaUtils
@SuppressWarnings("unchecked")
Iterator<String> keyIterator = jo.keys();
- while(keyIterator.hasNext()) {
+ while (keyIterator.hasNext()) {
String childKey = keyIterator.next();
JSONArray childJa = null;
try {
childJa = jo.getJSONArray(childKey);
- }
- catch(JSONException ex) {
+ } catch (JSONException ex) {
//Do nothing
}
- if(childJa != null) {
+ if (childJa != null) {
JSONArray result = findFirstKeyJSONArray(childJa, key);
- if(result != null) {
+ if (result != null) {
return result;
}
@@ -276,15 +256,14 @@ public class SchemaUtils
try {
childJo = jo.getJSONObject(childKey);
- }
- catch(JSONException ex) {
+ } catch (JSONException ex) {
//Do nothing
}
- if(childJo != null) {
+ if (childJo != null) {
JSONArray result = findFirstKeyJSONArray(childJo, key);
- if(result != null) {
+ if (result != null) {
return result;
}
}
@@ -295,22 +274,19 @@ public class SchemaUtils
public static JSONArray findFirstKeyJSONArray(JSONArray ja, String key)
{
- for(int index = 0;
- index < ja.length();
- index++) {
+ for (int index = 0; index < ja.length(); index++) {
JSONArray childJa = null;
try {
childJa = ja.getJSONArray(index);
- }
- catch(JSONException ex) {
+ } catch (JSONException ex) {
//Do nothing
}
- if(childJa != null) {
+ if (childJa != null) {
JSONArray result = findFirstKeyJSONArray(childJa, key);
- if(result != null) {
+ if (result != null) {
return result;
}
@@ -321,15 +297,14 @@ public class SchemaUtils
try {
childJo = ja.getJSONObject(index);
- }
- catch(JSONException ex) {
+ } catch (JSONException ex) {
//Do nothing
}
- if(childJo != null) {
+ if (childJo != null) {
JSONArray result = findFirstKeyJSONArray(childJo, key);
- if(result != null) {
+ if (result != null) {
return result;
}
}
@@ -340,12 +315,11 @@ public class SchemaUtils
public static JSONObject findFirstKeyJSONObject(JSONObject jo, String key)
{
- if(jo.has(key)) {
+ if (jo.has(key)) {
try {
JSONObject jsonObject = jo.getJSONObject(key);
return jsonObject;
- }
- catch(JSONException ex) {
+ } catch (JSONException ex) {
throw new RuntimeException(ex);
}
}
@@ -353,22 +327,21 @@ public class SchemaUtils
@SuppressWarnings("unchecked")
Iterator<String> keyIterator = jo.keys();
- while(keyIterator.hasNext()) {
+ while (keyIterator.hasNext()) {
String childKey = keyIterator.next();
JSONArray childJa = null;
try {
childJa = jo.getJSONArray(childKey);
- }
- catch(JSONException ex) {
+ } catch (JSONException ex) {
//Do nothing
}
- if(childJa != null) {
+ if (childJa != null) {
JSONObject result = findFirstKeyJSONObject(childJa, key);
- if(result != null) {
+ if (result != null) {
return result;
}
@@ -379,15 +352,14 @@ public class SchemaUtils
try {
childJo = jo.getJSONObject(childKey);
- }
- catch(JSONException ex) {
+ } catch (JSONException ex) {
//Do nothing
}
- if(childJo != null) {
+ if (childJo != null) {
JSONObject result = findFirstKeyJSONObject(childJo, key);
- if(result != null) {
+ if (result != null) {
return result;
}
}
@@ -398,22 +370,19 @@ public class SchemaUtils
public static JSONObject findFirstKeyJSONObject(JSONArray ja, String key)
{
- for(int index = 0;
- index < ja.length();
- index++) {
+ for (int index = 0; index < ja.length(); index++) {
JSONArray childJa = null;
try {
childJa = ja.getJSONArray(index);
- }
- catch(JSONException ex) {
+ } catch (JSONException ex) {
//Do nothing
}
- if(childJa != null) {
+ if (childJa != null) {
JSONObject result = findFirstKeyJSONObject(childJa, key);
- if(result != null) {
+ if (result != null) {
return result;
}
@@ -424,15 +393,14 @@ public class SchemaUtils
try {
childJo = ja.getJSONObject(index);
- }
- catch(JSONException ex) {
+ } catch (JSONException ex) {
//Do nothing
}
- if(childJo != null) {
+ if (childJo != null) {
JSONObject result = findFirstKeyJSONObject(childJo, key);
- if(result != null) {
+ if (result != null) {
return result;
}
}
@@ -452,14 +420,13 @@ public class SchemaUtils
@SuppressWarnings("unchecked")
Iterator<String> keyIterator = jo.keys();
- while(keyIterator.hasNext()) {
+ while (keyIterator.hasNext()) {
String key = keyIterator.next();
String value;
try {
value = jo.getString(key);
- }
- catch(JSONException ex) {
+ } catch (JSONException ex) {
throw new RuntimeException(ex);
}
@@ -478,14 +445,13 @@ public class SchemaUtils
{
JSONObject jo = new JSONObject();
- for(Map.Entry<String, String> entry: map.entrySet()) {
+ for (Map.Entry<String, String> entry : map.entrySet()) {
String key = entry.getKey();
String value = entry.getValue();
try {
jo.put(key, value);
- }
- catch(JSONException ex) {
+ } catch (JSONException ex) {
throw new RuntimeException(ex);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/appdata/schemas/SnapshotSchema.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/appdata/schemas/SnapshotSchema.java b/library/src/main/java/com/datatorrent/lib/appdata/schemas/SnapshotSchema.java
index aed9013..5010580 100644
--- a/library/src/main/java/com/datatorrent/lib/appdata/schemas/SnapshotSchema.java
+++ b/library/src/main/java/com/datatorrent/lib/appdata/schemas/SnapshotSchema.java
@@ -22,13 +22,13 @@ import java.util.Collections;
import java.util.Map;
import java.util.Set;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
-
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+
import com.datatorrent.netlet.util.DTThrowable;
/**
@@ -118,8 +118,7 @@ public class SnapshotSchema implements Schema
* @param schemaJSON The JSON defining this schema.
* @param schemaKeys The schema keys tied to this schema.
*/
- public SnapshotSchema(String schemaJSON,
- Map<String, String> schemaKeys)
+ public SnapshotSchema(String schemaJSON, Map<String, String> schemaKeys)
{
this(schemaJSON);
@@ -133,12 +132,9 @@ public class SnapshotSchema implements Schema
* @param schemaJSON The schemaJSON this schema is built from.
* @param schemaKeys The schemaKeys associated with this schema.
*/
- public SnapshotSchema(int schemaID,
- String schemaJSON,
- Map<String, String> schemaKeys)
+ public SnapshotSchema(int schemaID, String schemaJSON, Map<String, String> schemaKeys)
{
- this(schemaJSON,
- schemaKeys);
+ this(schemaJSON, schemaKeys);
this.schemaID = schemaID;
}
@@ -153,8 +149,7 @@ public class SnapshotSchema implements Schema
try {
initialize();
- }
- catch(Exception ex) {
+ } catch (Exception ex) {
DTThrowable.rethrow(ex);
}
}
@@ -165,8 +160,7 @@ public class SnapshotSchema implements Schema
* @param schemaID The schemaID associated with this schema.
* @param schemaJSON The JSON that this schema is constructed from.
*/
- public SnapshotSchema(int schemaID,
- String schemaJSON)
+ public SnapshotSchema(int schemaID, String schemaJSON)
{
this(schemaJSON);
this.schemaID = schemaID;
@@ -177,12 +171,12 @@ public class SnapshotSchema implements Schema
{
changed = true;
- if(schemaKeys == null) {
+ if (schemaKeys == null) {
this.schemaKeys = null;
return;
}
- for(Map.Entry<String, String> entry: schemaKeys.entrySet()) {
+ for (Map.Entry<String, String> entry : schemaKeys.entrySet()) {
Preconditions.checkNotNull(entry.getKey());
Preconditions.checkNotNull(entry.getValue());
}
@@ -199,9 +193,8 @@ public class SnapshotSchema implements Schema
{
schema = new JSONObject(schemaJSON);
- if(schemaKeys != null) {
- schema.put(Schema.FIELD_SCHEMA_KEYS,
- SchemaUtils.createJSONObject(schemaKeys));
+ if (schemaKeys != null) {
+ schema.put(Schema.FIELD_SCHEMA_KEYS, SchemaUtils.createJSONObject(schemaKeys));
}
valueToType = Maps.newHashMap();
@@ -209,12 +202,9 @@ public class SnapshotSchema implements Schema
JSONArray values = schema.getJSONArray(FIELD_VALUES);
Preconditions.checkState(values.length() > 0,
- "The schema does not specify any values.");
+ "The schema does not specify any values.");
- for(int index = 0;
- index < values.length();
- index++)
- {
+ for (int index = 0; index < values.length(); index++) {
JSONObject value = values.getJSONObject(index);
String name = value.getString(FIELD_VALUES_NAME);
String typeName = value.getString(FIELD_VALUES_TYPE);
@@ -222,9 +212,7 @@ public class SnapshotSchema implements Schema
Type type = Type.NAME_TO_TYPE.get(typeName);
valueToType.put(name, type);
- Preconditions.checkArgument(type != null,
- typeName
- + " is not a valid type.");
+ Preconditions.checkArgument(type != null, typeName + " is not a valid type.");
}
valueToType = Collections.unmodifiableMap(valueToType);
@@ -233,8 +221,7 @@ public class SnapshotSchema implements Schema
try {
schema.put(FIELD_SCHEMA_TYPE, SCHEMA_TYPE);
schema.put(FIELD_SCHEMA_VERSION, SCHEMA_VERSION);
- }
- catch(JSONException e) {
+ } catch (JSONException e) {
throw new RuntimeException(e);
}
@@ -243,8 +230,9 @@ public class SnapshotSchema implements Schema
public void setTags(Set<String> tags)
{
- if (tags == null || tags.isEmpty())
+ if (tags == null || tags.isEmpty()) {
throw new IllegalArgumentException("tags can't be null or empty.");
+ }
try {
JSONArray tagArray = new JSONArray(tags);
@@ -270,19 +258,16 @@ public class SnapshotSchema implements Schema
@Override
public String getSchemaJSON()
{
- if(!changed && schemaJSON != null) {
+ if (!changed && schemaJSON != null) {
return schemaJSON;
}
- if(schemaKeys == null) {
+ if (schemaKeys == null) {
schema.remove(Schema.FIELD_SCHEMA_KEYS);
- }
- else {
+ } else {
try {
- schema.put(Schema.FIELD_SCHEMA_KEYS,
- SchemaUtils.createJSONObject(schemaKeys));
- }
- catch(JSONException ex) {
+ schema.put(Schema.FIELD_SCHEMA_KEYS, SchemaUtils.createJSONObject(schemaKeys));
+ } catch (JSONException ex) {
throw new RuntimeException(ex);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/appdata/schemas/TimeBucket.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/appdata/schemas/TimeBucket.java b/library/src/main/java/com/datatorrent/lib/appdata/schemas/TimeBucket.java
index f1f24bc..5570ca0 100644
--- a/library/src/main/java/com/datatorrent/lib/appdata/schemas/TimeBucket.java
+++ b/library/src/main/java/com/datatorrent/lib/appdata/schemas/TimeBucket.java
@@ -91,13 +91,11 @@ public enum TimeBucket
*/
public static final Map<TimeUnit, TimeBucket> TIME_UNIT_TO_TIME_BUCKET;
- static
- {
+ static {
Map<String, TimeBucket> bucketToType = Maps.newHashMap();
Map<TimeUnit, TimeBucket> timeUnitToTimeBucket = Maps.newHashMap();
- for(TimeBucket timeBucket: TimeBucket.values())
- {
+ for (TimeBucket timeBucket : TimeBucket.values()) {
timeUnitToTimeBucket.put(timeBucket.getTimeUnit(), timeBucket);
bucketToType.put(timeBucket.getText(), timeBucket);
}
@@ -188,7 +186,7 @@ public enum TimeBucket
*/
public long roundDown(long timestamp)
{
- if(timeUnit == null) {
+ if (timeUnit == null) {
return 0;
}
@@ -218,8 +216,7 @@ public enum TimeBucket
public static TimeBucket getBucketEx(String name)
{
TimeBucket bucket = getBucket(name);
- Preconditions.checkArgument(bucket != null,
- name + " is not a valid bucket type.");
+ Preconditions.checkArgument(bucket != null, name + " is not a valid bucket type.");
return bucket;
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/appdata/schemas/Type.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/appdata/schemas/Type.java b/library/src/main/java/com/datatorrent/lib/appdata/schemas/Type.java
index 1748e17..5e79466 100644
--- a/library/src/main/java/com/datatorrent/lib/appdata/schemas/Type.java
+++ b/library/src/main/java/com/datatorrent/lib/appdata/schemas/Type.java
@@ -106,7 +106,7 @@ public enum Type implements Serializable
static {
Map<String, Type> nameToType = Maps.newHashMap();
- for(Type type: Type.values()) {
+ for (Type type : Type.values()) {
nameToType.put(type.getName(), type);
}
@@ -114,7 +114,7 @@ public enum Type implements Serializable
Map<Class<?>, Type> clazzToType = Maps.newHashMap();
- for(Type type: Type.values()) {
+ for (Type type : Type.values()) {
clazzToType.put(type.getClazz(), type);
}
@@ -153,11 +153,7 @@ public enum Type implements Serializable
* @param clazz The Class of the corresponding Java type.
* @param higherTypes The set of types to which this type can be promoted.
*/
- Type(String name,
- int byteSize,
- JSONType jsonType,
- Class<?> clazz,
- Set<Type> higherTypes)
+ Type(String name, int byteSize, JSONType jsonType, Class<?> clazz, Set<Type> higherTypes)
{
this.name = name;
this.byteSize = byteSize;
@@ -247,8 +243,7 @@ public enum Type implements Serializable
{
Type type = getType(name);
- Preconditions.checkArgument(type != null,
- name + " is not a valid type.");
+ Preconditions.checkArgument(type != null, name + " is not a valid type.");
return type;
}
@@ -262,50 +257,41 @@ public enum Type implements Serializable
*/
public static Object promote(Type from, Type to, Object o)
{
- if(from == to) {
+ if (from == to) {
return o;
}
- Preconditions.checkArgument(!(from == Type.BOOLEAN
- || from == Type.CHAR
- || from == Type.LONG
- || from == Type.DOUBLE),
- "Cannot convert "
- + Type.BOOLEAN.getName() + ", "
- + Type.CHAR.getName() + ", "
- + Type.LONG.getName() + ", or "
- + Type.DOUBLE + " to a larger type.");
+ Preconditions.checkArgument(!(from == Type.BOOLEAN || from == Type.CHAR || from == Type.LONG
+ || from == Type.DOUBLE), "Cannot convert " + Type.BOOLEAN.getName() + ", " + Type.CHAR.getName()
+ + ", " + Type.LONG.getName() + ", or " + Type.DOUBLE + " to a larger type.");
Preconditions.checkArgument(from.getHigherTypes().contains(to),
- from.getName() + " cannot be promoted to " + to.getName());
+ from.getName() + " cannot be promoted to " + to.getName());
- if(from == Type.FLOAT && to == Type.DOUBLE) {
+ if (from == Type.FLOAT && to == Type.DOUBLE) {
return (Double)((Float)o).doubleValue();
}
- if(from == Type.BYTE) {
- if(to == Type.SHORT) {
+ if (from == Type.BYTE) {
+ if (to == Type.SHORT) {
return (Short)((Byte)o).shortValue();
- }
- else if(to == Type.INTEGER) {
+ } else if (to == Type.INTEGER) {
return (Integer)((Byte)o).intValue();
- }
- else if(to == Type.LONG) {
+ } else if (to == Type.LONG) {
return (Long)((Byte)o).longValue();
}
}
- if(from == Type.SHORT) {
- if(to == Type.INTEGER) {
+ if (from == Type.SHORT) {
+ if (to == Type.INTEGER) {
return (Integer)((Short)o).intValue();
- }
- else if(to == Type.LONG) {
+ } else if (to == Type.LONG) {
return (Long)((Short)o).longValue();
}
}
- if(from == Type.INTEGER
- && to == Type.LONG) {
+ if (from == Type.INTEGER
+ && to == Type.LONG) {
return (Long)((Integer)o).longValue();
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java b/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java
index 236735f..0b03e79 100644
--- a/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java
+++ b/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java
@@ -111,8 +111,8 @@ public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper
private Set<String> tags;
@AppData.QueryPort
- @InputPortFieldAnnotation(optional=true)
- public transient final DefaultInputPort<String> query = new DefaultInputPort<String>()
+ @InputPortFieldAnnotation(optional = true)
+ public final transient DefaultInputPort<String> query = new DefaultInputPort<String>()
{
@Override
public void process(String queryJSON)
@@ -151,7 +151,7 @@ public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper
}
}
- public transient final DefaultInputPort<List<INPUT_EVENT>> input = new DefaultInputPort<List<INPUT_EVENT>>()
+ public final transient DefaultInputPort<List<INPUT_EVENT>> input = new DefaultInputPort<List<INPUT_EVENT>>()
{
@Override
public void process(List<INPUT_EVENT> rows)
@@ -187,7 +187,7 @@ public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper
@Override
- final public void activate(OperatorContext ctx)
+ public final void activate(OperatorContext ctx)
{
if (embeddableQueryInfoProvider != null) {
embeddableQueryInfoProvider.activate(ctx);
@@ -213,8 +213,7 @@ public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper
if (embeddableQueryInfoProvider != null) {
embeddableQueryInfoProvider.enableEmbeddedMode();
LOG.info("An embeddable query operator is being used of class {}.", embeddableQueryInfoProvider.getClass().getName());
- StoreUtils.attachOutputPortToInputPort(embeddableQueryInfoProvider.getOutputPort(),
- query);
+ StoreUtils.attachOutputPortToInputPort(embeddableQueryInfoProvider.getOutputPort(), query);
embeddableQueryInfoProvider.setup(context);
}
}
@@ -222,8 +221,9 @@ public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper
protected void setupSchema()
{
schema = new SnapshotSchema(snapshotSchemaJSON);
- if (tags != null && !tags.isEmpty())
+ if (tags != null && !tags.isEmpty()) {
schema.setTags(tags);
+ }
}
protected void setupQueryProcessor()
@@ -252,7 +252,7 @@ public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper
{
Result result;
- while((result = queryProcessor.process()) != null) {
+ while ((result = queryProcessor.process()) != null) {
String resultJSON = resultSerializerFactory.serialize(result);
LOG.debug("emitting {}", resultJSON);
queryResult.emit(resultJSON);
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AppDataSnapshotServerMap.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AppDataSnapshotServerMap.java b/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AppDataSnapshotServerMap.java
index 8134ff9..0fc4200 100644
--- a/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AppDataSnapshotServerMap.java
+++ b/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AppDataSnapshotServerMap.java
@@ -23,13 +23,14 @@ import java.util.Map;
import javax.validation.constraints.NotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.datatorrent.lib.appdata.gpo.GPOMutable;
import com.datatorrent.lib.appdata.schemas.FieldsDescriptor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* This operator accepts a list of Map<String,Object> objects, and serves the data under the {@link SnapshotSchema}.
@@ -59,9 +60,7 @@ public class AppDataSnapshotServerMap extends AbstractAppDataSnapshotServer<Map<
List<String> fields = fd.getFieldList();
- for(int index = 0;
- index < fields.size();
- index++) {
+ for (int index = 0; index < fields.size(); index++) {
String field = fields.get(index);
values.setFieldGeneric(field, inputEvent.get(getMapField(field)));
}
@@ -77,13 +76,13 @@ public class AppDataSnapshotServerMap extends AbstractAppDataSnapshotServer<Map<
*/
private String getMapField(String field)
{
- if(tableFieldToMapField == null) {
+ if (tableFieldToMapField == null) {
return field;
}
String mapField = tableFieldToMapField.get(field);
- if(mapField == null) {
+ if (mapField == null) {
return field;
}
@@ -111,7 +110,7 @@ public class AppDataSnapshotServerMap extends AbstractAppDataSnapshotServer<Map<
{
Preconditions.checkNotNull(tableFieldToMapField);
- for(Map.Entry<String, String> entry: tableFieldToMapField.entrySet()) {
+ for (Map.Entry<String, String> entry : tableFieldToMapField.entrySet()) {
Preconditions.checkNotNull(entry.getKey());
Preconditions.checkNotNull(entry.getValue());
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AppDataSnapshotServerPOJO.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AppDataSnapshotServerPOJO.java b/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AppDataSnapshotServerPOJO.java
index 68d9017..ab3c760 100644
--- a/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AppDataSnapshotServerPOJO.java
+++ b/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AppDataSnapshotServerPOJO.java
@@ -77,7 +77,7 @@ public class AppDataSnapshotServerPOJO extends AbstractAppDataSnapshotServer<Obj
*/
private void firstTuple(Object inputEvent)
{
- if(firstTupleProcessed) {
+ if (firstTupleProcessed) {
return;
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/codec/JavaSerializationStreamCodec.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/codec/JavaSerializationStreamCodec.java b/library/src/main/java/com/datatorrent/lib/codec/JavaSerializationStreamCodec.java
index b82fc14..1096a72 100644
--- a/library/src/main/java/com/datatorrent/lib/codec/JavaSerializationStreamCodec.java
+++ b/library/src/main/java/com/datatorrent/lib/codec/JavaSerializationStreamCodec.java
@@ -41,37 +41,37 @@ import com.datatorrent.netlet.util.Slice;
*/
public class JavaSerializationStreamCodec<T extends Serializable> implements StreamCodec<T>, Serializable
{
- @Override
- public Object fromByteArray(Slice fragment)
- {
- ByteArrayInputStream bis = new ByteArrayInputStream(fragment.buffer,
- fragment.offset, fragment.length);
- try {
- ObjectInputStream ois = new ObjectInputStream(bis);
- return ois.readObject();
- } catch (Exception ioe) {
- throw new RuntimeException(ioe);
- }
- }
+ @Override
+ public Object fromByteArray(Slice fragment)
+ {
+ ByteArrayInputStream bis = new ByteArrayInputStream(fragment.buffer,
+ fragment.offset, fragment.length);
+ try {
+ ObjectInputStream ois = new ObjectInputStream(bis);
+ return ois.readObject();
+ } catch (Exception ioe) {
+ throw new RuntimeException(ioe);
+ }
+ }
- @Override
- public Slice toByteArray(T object)
- {
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- try {
- ObjectOutputStream oos = new ObjectOutputStream(bos);
- oos.writeObject(object);
- oos.flush();
- byte[] buffer = bos.toByteArray();
- return new Slice(buffer, 0, buffer.length);
- } catch (IOException ex) {
- throw new RuntimeException(ex);
- }
- }
+ @Override
+ public Slice toByteArray(T object)
+ {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ try {
+ ObjectOutputStream oos = new ObjectOutputStream(bos);
+ oos.writeObject(object);
+ oos.flush();
+ byte[] buffer = bos.toByteArray();
+ return new Slice(buffer, 0, buffer.length);
+ } catch (IOException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
- @Override
- public int getPartition(T o)
- {
- return o.hashCode();
- }
+ @Override
+ public int getPartition(T o)
+ {
+ return o.hashCode();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/converter/ByteArrayToStringConverterOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/converter/ByteArrayToStringConverterOperator.java b/library/src/main/java/com/datatorrent/lib/converter/ByteArrayToStringConverterOperator.java
index 7342cfd..33e5bf6 100644
--- a/library/src/main/java/com/datatorrent/lib/converter/ByteArrayToStringConverterOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/converter/ByteArrayToStringConverterOperator.java
@@ -18,10 +18,11 @@
*/
package com.datatorrent.lib.converter;
-import com.datatorrent.common.util.BaseOperator;
+import java.nio.charset.Charset;
+
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
-import java.nio.charset.Charset;
+import com.datatorrent.common.util.BaseOperator;
/**
* This operator converts Byte Array to String. User gets the option of providing character Encoding.
@@ -55,7 +56,7 @@ public class ByteArrayToStringConverterOperator extends BaseOperator
@Override
public void process(byte[] message)
{
- output.emit(characterEncoding == null? new String(message): new String(message, characterEncoding));
+ output.emit(characterEncoding == null ? new String(message) : new String(message, characterEncoding));
}
};
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/converter/Converter.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/converter/Converter.java b/library/src/main/java/com/datatorrent/lib/converter/Converter.java
index f1d4325..ef999e4 100644
--- a/library/src/main/java/com/datatorrent/lib/converter/Converter.java
+++ b/library/src/main/java/com/datatorrent/lib/converter/Converter.java
@@ -36,8 +36,7 @@ public interface Converter<INPUT, OUTPUT>
* Provide the implementation for converting tuples from one format to the
* other
*
- * @param INPUT
- * tuple of certain format
+ * @param tuple tuple of certain format
* @return OUTPUT tuple of converted format
*/
public OUTPUT convert(INPUT tuple);
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/converter/MapToKeyHashValuePairConverter.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/converter/MapToKeyHashValuePairConverter.java b/library/src/main/java/com/datatorrent/lib/converter/MapToKeyHashValuePairConverter.java
index bb09cd8..52d3273 100644
--- a/library/src/main/java/com/datatorrent/lib/converter/MapToKeyHashValuePairConverter.java
+++ b/library/src/main/java/com/datatorrent/lib/converter/MapToKeyHashValuePairConverter.java
@@ -21,9 +21,9 @@ package com.datatorrent.lib.converter;
import java.util.Map;
import java.util.Map.Entry;
-import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.lib.util.KeyHashValPair;
/**
@@ -36,7 +36,8 @@ import com.datatorrent.lib.util.KeyHashValPair;
*
* @since 3.0.0
*/
-public class MapToKeyHashValuePairConverter<K, V> extends BaseOperator {
+public class MapToKeyHashValuePairConverter<K, V> extends BaseOperator
+{
/**
* Input port which accepts Map<K, V>.
@@ -46,8 +47,7 @@ public class MapToKeyHashValuePairConverter<K, V> extends BaseOperator {
@Override
public void process(Map<K, V> tuple)
{
- for(Entry<K, V> entry:tuple.entrySet())
- {
+ for (Entry<K, V> entry : tuple.entrySet()) {
output.emit(new KeyHashValPair<K, V>(entry.getKey(), entry.getValue()));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/converter/MapToKeyValuePairConverter.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/converter/MapToKeyValuePairConverter.java b/library/src/main/java/com/datatorrent/lib/converter/MapToKeyValuePairConverter.java
index f7f63ed..efab4a5 100644
--- a/library/src/main/java/com/datatorrent/lib/converter/MapToKeyValuePairConverter.java
+++ b/library/src/main/java/com/datatorrent/lib/converter/MapToKeyValuePairConverter.java
@@ -21,11 +21,10 @@ package com.datatorrent.lib.converter;
import java.util.Map;
import java.util.Map.Entry;
-import com.datatorrent.lib.util.KeyValPair;
-
-import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.util.KeyValPair;
/**
*
@@ -37,7 +36,8 @@ import com.datatorrent.api.DefaultOutputPort;
*
* @since 3.0.0
*/
-public class MapToKeyValuePairConverter<K, V> extends BaseOperator {
+public class MapToKeyValuePairConverter<K, V> extends BaseOperator
+{
/**
* Input port which accepts Map<K, V>.
@@ -47,8 +47,7 @@ public class MapToKeyValuePairConverter<K, V> extends BaseOperator {
@Override
public void process(Map<K, V> tuple)
{
- for(Entry<K, V> entry:tuple.entrySet())
- {
+ for (Entry<K, V> entry : tuple.entrySet()) {
output.emit(new KeyValPair<K, V>(entry.getKey(), entry.getValue()));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/converter/StringValueToNumberConverterForMap.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/converter/StringValueToNumberConverterForMap.java b/library/src/main/java/com/datatorrent/lib/converter/StringValueToNumberConverterForMap.java
index ad93868..e39259e 100644
--- a/library/src/main/java/com/datatorrent/lib/converter/StringValueToNumberConverterForMap.java
+++ b/library/src/main/java/com/datatorrent/lib/converter/StringValueToNumberConverterForMap.java
@@ -22,9 +22,9 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
-import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.common.util.BaseOperator;
/**
*
@@ -36,7 +36,8 @@ import com.datatorrent.api.DefaultOutputPort;
*
* @since 3.0.0
*/
-public class StringValueToNumberConverterForMap<K> extends BaseOperator {
+public class StringValueToNumberConverterForMap<K> extends BaseOperator
+{
/**
* Input port which accepts Map<K, Numeric String>.
@@ -47,8 +48,7 @@ public class StringValueToNumberConverterForMap<K> extends BaseOperator {
public void process(Map<K, String> tuple)
{
Map<K, Number> outputMap = new HashMap<K, Number>();
- for(Entry<K, String> entry:tuple.entrySet())
- {
+ for (Entry<K, String> entry : tuple.entrySet()) {
String val = entry.getValue();
if (val == null) {
return;
@@ -57,12 +57,10 @@ public class StringValueToNumberConverterForMap<K> extends BaseOperator {
boolean errortuple = false;
try {
tvalue = Double.parseDouble(val);
- }
- catch (NumberFormatException e) {
+ } catch (NumberFormatException e) {
errortuple = true;
}
- if(!errortuple)
- {
+ if (!errortuple) {
outputMap.put(entry.getKey(), tvalue);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/counters/BasicCounters.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/counters/BasicCounters.java b/library/src/main/java/com/datatorrent/lib/counters/BasicCounters.java
index 60e2d8e..255ec03 100644
--- a/library/src/main/java/com/datatorrent/lib/counters/BasicCounters.java
+++ b/library/src/main/java/com/datatorrent/lib/counters/BasicCounters.java
@@ -25,9 +25,6 @@ import java.util.Map;
import javax.annotation.Nonnull;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Maps;
-
import org.codehaus.jackson.JsonGenerator;
import org.codehaus.jackson.map.JsonSerializer;
import org.codehaus.jackson.map.SerializerProvider;
@@ -35,8 +32,10 @@ import org.codehaus.jackson.map.annotate.JsonSerialize;
import org.apache.commons.lang.mutable.Mutable;
-import com.datatorrent.api.Context;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import com.datatorrent.api.Context;
import com.datatorrent.common.util.NumberAggregate;
/**
@@ -142,7 +141,7 @@ public class BasicCounters<T extends Number & Mutable> implements Serializable
for (Object counter : objects) {
if (counter instanceof BasicCounters) {
@SuppressWarnings("unchecked")
- BasicCounters<T> physical = (BasicCounters<T>) counter;
+ BasicCounters<T> physical = (BasicCounters<T>)counter;
ImmutableMap<Enum<?>, T> copy = physical.getCopy();
for (Map.Entry<Enum<?>, T> entry : copy.entrySet()) {
@@ -178,7 +177,7 @@ public class BasicCounters<T extends Number & Mutable> implements Serializable
for (Object counter : objects) {
if (counter instanceof BasicCounters) {
@SuppressWarnings("unchecked")
- BasicCounters<T> physical = (BasicCounters<T>) counter;
+ BasicCounters<T> physical = (BasicCounters<T>)counter;
ImmutableMap<Enum<?>, T> copy = physical.getCopy();
for (Map.Entry<Enum<?>, T> entry : copy.entrySet()) {
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/db/AbstractAggregateTransactionableKeyValueStoreOutputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/AbstractAggregateTransactionableKeyValueStoreOutputOperator.java b/library/src/main/java/com/datatorrent/lib/db/AbstractAggregateTransactionableKeyValueStoreOutputOperator.java
index ba89f7b..5225329 100644
--- a/library/src/main/java/com/datatorrent/lib/db/AbstractAggregateTransactionableKeyValueStoreOutputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/db/AbstractAggregateTransactionableKeyValueStoreOutputOperator.java
@@ -34,6 +34,6 @@ package com.datatorrent.lib.db;
*/
@org.apache.hadoop.classification.InterfaceStability.Evolving
public abstract class AbstractAggregateTransactionableKeyValueStoreOutputOperator<T, S extends TransactionableKeyValueStore>
- extends AbstractAggregateTransactionableStoreOutputOperator<T, S>
+ extends AbstractAggregateTransactionableStoreOutputOperator<T, S>
{
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/db/AbstractBatchTransactionableStoreOutputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/AbstractBatchTransactionableStoreOutputOperator.java b/library/src/main/java/com/datatorrent/lib/db/AbstractBatchTransactionableStoreOutputOperator.java
index 771e679..fbb924a 100644
--- a/library/src/main/java/com/datatorrent/lib/db/AbstractBatchTransactionableStoreOutputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/db/AbstractBatchTransactionableStoreOutputOperator.java
@@ -35,10 +35,13 @@ import com.google.common.collect.Lists;
* @since 1.0.2
*/
@org.apache.hadoop.classification.InterfaceStability.Evolving
-public abstract class AbstractBatchTransactionableStoreOutputOperator<T, S extends TransactionableStore> extends AbstractAggregateTransactionableStoreOutputOperator<T, S> {
+public abstract class AbstractBatchTransactionableStoreOutputOperator<T, S extends TransactionableStore>
+ extends AbstractAggregateTransactionableStoreOutputOperator<T, S>
+{
private Collection<T> tuples;
- public AbstractBatchTransactionableStoreOutputOperator(){
+ public AbstractBatchTransactionableStoreOutputOperator()
+ {
tuples = Lists.newArrayList();
}
@@ -62,7 +65,8 @@ public abstract class AbstractBatchTransactionableStoreOutputOperator<T, S exten
public abstract void processBatch(Collection<T> tuples);
@Override
- public void storeAggregate() {
+ public void storeAggregate()
+ {
processBatch(tuples);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/db/AbstractKeyValueStoreInputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/AbstractKeyValueStoreInputOperator.java b/library/src/main/java/com/datatorrent/lib/db/AbstractKeyValueStoreInputOperator.java
index 9b6ddc2..f3f8c63 100644
--- a/library/src/main/java/com/datatorrent/lib/db/AbstractKeyValueStoreInputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/db/AbstractKeyValueStoreInputOperator.java
@@ -18,7 +18,11 @@
*/
package com.datatorrent.lib.db;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
/**
* This is the base implementation of an input operator which consumes data from a key value store.
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/db/AbstractPassThruTransactionableKeyValueStoreOutputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/AbstractPassThruTransactionableKeyValueStoreOutputOperator.java b/library/src/main/java/com/datatorrent/lib/db/AbstractPassThruTransactionableKeyValueStoreOutputOperator.java
index f199986..fe828dc 100644
--- a/library/src/main/java/com/datatorrent/lib/db/AbstractPassThruTransactionableKeyValueStoreOutputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/db/AbstractPassThruTransactionableKeyValueStoreOutputOperator.java
@@ -36,6 +36,6 @@ package com.datatorrent.lib.db;
*/
@org.apache.hadoop.classification.InterfaceStability.Evolving
public abstract class AbstractPassThruTransactionableKeyValueStoreOutputOperator<T, S extends TransactionableKeyValueStore>
- extends AbstractPassThruTransactionableStoreOutputOperator<T, S>
+ extends AbstractPassThruTransactionableStoreOutputOperator<T, S>
{
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/db/AbstractPassThruTransactionableStoreOutputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/AbstractPassThruTransactionableStoreOutputOperator.java b/library/src/main/java/com/datatorrent/lib/db/AbstractPassThruTransactionableStoreOutputOperator.java
index 55c8617..b471a63 100644
--- a/library/src/main/java/com/datatorrent/lib/db/AbstractPassThruTransactionableStoreOutputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/db/AbstractPassThruTransactionableStoreOutputOperator.java
@@ -35,7 +35,8 @@ package com.datatorrent.lib.db;
* @since 0.9.3
*/
@org.apache.hadoop.classification.InterfaceStability.Evolving
-public abstract class AbstractPassThruTransactionableStoreOutputOperator<T, S extends TransactionableStore> extends AbstractTransactionableStoreOutputOperator<T, S>
+public abstract class AbstractPassThruTransactionableStoreOutputOperator<T, S extends TransactionableStore>
+ extends AbstractTransactionableStoreOutputOperator<T, S>
{
@Override
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/db/AbstractStoreInputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/AbstractStoreInputOperator.java b/library/src/main/java/com/datatorrent/lib/db/AbstractStoreInputOperator.java
index a6eb780..18e23f4 100644
--- a/library/src/main/java/com/datatorrent/lib/db/AbstractStoreInputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/db/AbstractStoreInputOperator.java
@@ -41,7 +41,7 @@ public abstract class AbstractStoreInputOperator<T, S extends Connectable> imple
/**
* The output port on which tuples read form a store are emitted.
*/
- final public transient DefaultOutputPort<T> outputPort = new DefaultOutputPort<T>();
+ public final transient DefaultOutputPort<T> outputPort = new DefaultOutputPort<T>();
protected S store;
/**
* Gets the store.
@@ -79,8 +79,7 @@ public abstract class AbstractStoreInputOperator<T, S extends Connectable> imple
{
try {
store.connect();
- }
- catch (IOException ex) {
+ } catch (IOException ex) {
throw new RuntimeException(ex);
}
}
@@ -90,8 +89,7 @@ public abstract class AbstractStoreInputOperator<T, S extends Connectable> imple
{
try {
store.disconnect();
- }
- catch (IOException ex) {
+ } catch (IOException ex) {
// ignore
}
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/db/AbstractStoreOutputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/AbstractStoreOutputOperator.java b/library/src/main/java/com/datatorrent/lib/db/AbstractStoreOutputOperator.java
index aff5b3e..2ef8d30 100644
--- a/library/src/main/java/com/datatorrent/lib/db/AbstractStoreOutputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/db/AbstractStoreOutputOperator.java
@@ -20,10 +20,10 @@ package com.datatorrent.lib.db;
import java.io.IOException;
-import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+import com.datatorrent.common.util.BaseOperator;
/**
* This is the base implementation of an output operator,
@@ -80,8 +80,7 @@ public abstract class AbstractStoreOutputOperator<T, S extends Connectable> exte
{
try {
store.connect();
- }
- catch (IOException ex) {
+ } catch (IOException ex) {
throw new RuntimeException(ex);
}
}
@@ -96,8 +95,7 @@ public abstract class AbstractStoreOutputOperator<T, S extends Connectable> exte
{
try {
store.disconnect();
- }
- catch (IOException ex) {
+ } catch (IOException ex) {
throw new RuntimeException(ex);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/db/AbstractTransactionableStoreOutputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/AbstractTransactionableStoreOutputOperator.java b/library/src/main/java/com/datatorrent/lib/db/AbstractTransactionableStoreOutputOperator.java
index 61682c2..037a8b2 100644
--- a/library/src/main/java/com/datatorrent/lib/db/AbstractTransactionableStoreOutputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/db/AbstractTransactionableStoreOutputOperator.java
@@ -20,11 +20,11 @@ package com.datatorrent.lib.db;
import java.io.IOException;
-import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+import com.datatorrent.common.util.BaseOperator;
/**
* This is the base implementation of an output operator,
@@ -95,8 +95,7 @@ public abstract class AbstractTransactionableStoreOutputOperator<T, S extends Tr
appId = context.getValue(DAG.APPLICATION_ID);
operatorId = context.getId();
committedWindowId = store.getCommittedWindowId(appId, operatorId);
- }
- catch (IOException ex) {
+ } catch (IOException ex) {
throw new RuntimeException(ex);
}
}
@@ -115,8 +114,7 @@ public abstract class AbstractTransactionableStoreOutputOperator<T, S extends Tr
store.rollbackTransaction();
}
store.disconnect();
- }
- catch (IOException ex) {
+ } catch (IOException ex) {
throw new RuntimeException(ex);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcNonTransactionableBatchOutputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcNonTransactionableBatchOutputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcNonTransactionableBatchOutputOperator.java
index 03f7719..7dee870 100644
--- a/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcNonTransactionableBatchOutputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcNonTransactionableBatchOutputOperator.java
@@ -18,15 +18,19 @@
*/
package com.datatorrent.lib.db.jdbc;
-import com.datatorrent.api.Context.OperatorContext;
-import com.datatorrent.api.DAG;
-import com.google.common.collect.Lists;
import java.sql.SQLException;
import java.util.List;
+
import javax.validation.constraints.Min;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.collect.Lists;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG;
+
/**
* A generic output operator which updates the database without using transactions
* and batches writes to increase performance. This operator satisfies the exactly once constraint
@@ -38,7 +42,7 @@ import org.slf4j.LoggerFactory;
*/
public abstract class AbstractJdbcNonTransactionableBatchOutputOperator<T, S extends JdbcNonTransactionalStore> extends AbstractJdbcNonTransactionableOutputOperator<T, S>
{
- private static transient final Logger LOG = LoggerFactory.getLogger(AbstractJdbcNonTransactionableBatchOutputOperator.class);
+ private static final transient Logger LOG = LoggerFactory.getLogger(AbstractJdbcNonTransactionableBatchOutputOperator.class);
public static final int DEFAULT_BATCH_SIZE = 1000;
@Min(1)
@@ -93,7 +97,7 @@ public abstract class AbstractJdbcNonTransactionableBatchOutputOperator<T, S ext
mode = context.getValue(OperatorContext.PROCESSING_MODE);
- if(mode==ProcessingMode.AT_MOST_ONCE){
+ if (mode == ProcessingMode.AT_MOST_ONCE) {
//Batch must be cleared to avoid writing same data twice
tuples.clear();
}
@@ -129,7 +133,7 @@ public abstract class AbstractJdbcNonTransactionableBatchOutputOperator<T, S ext
super.endWindow();
//This window is done so write it to the database.
- if(committedWindowId < currentWindowId) {
+ if (committedWindowId < currentWindowId) {
store.storeCommittedWindowId(appId, operatorId, currentWindowId);
committedWindowId = currentWindowId;
}
@@ -139,7 +143,7 @@ public abstract class AbstractJdbcNonTransactionableBatchOutputOperator<T, S ext
public void processTuple(T tuple)
{
//Minimize duplicated data in the atleast once case
- if(committedWindowId >= currentWindowId) {
+ if (committedWindowId >= currentWindowId) {
return;
}
@@ -149,7 +153,7 @@ public abstract class AbstractJdbcNonTransactionableBatchOutputOperator<T, S ext
setStatementParameters(updateCommand, tuple);
updateCommand.addBatch();
- if(tuples.size() >= batchSize) {
+ if (tuples.size() >= batchSize) {
tuples.clear();
updateCommand.executeBatch();
updateCommand.clearBatch();