You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ti...@apache.org on 2016/03/29 00:39:45 UTC

[4/4] incubator-apex-malhar git commit: APEXMALHAR-1991 #resolve #comment Move Dimensions Computation Classes to org.apache.apex.malhar package

APEXMALHAR-1991 #resolve #comment Move Dimensions Computation Classes to org.apache.apex.malhar package


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/c5cab8bd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/c5cab8bd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/c5cab8bd

Branch: refs/heads/master
Commit: c5cab8bd53913d76cbd0a6c37d56a9cca6968c89
Parents: 79eeff7
Author: Timothy Farkas <ti...@datatorrent.com>
Authored: Mon Mar 28 15:37:11 2016 -0700
Committer: Timothy Farkas <ti...@datatorrent.com>
Committed: Mon Mar 28 15:37:11 2016 -0700

----------------------------------------------------------------------
 .../schemas/DimensionalConfigurationSchema.java |  13 +-
 .../lib/appdata/schemas/DimensionalSchema.java  |   5 +-
 .../db/jdbc/JDBCDimensionalOutputOperator.java  |   8 +-
 .../dimensions/CustomTimeBucketRegistry.java    | 141 ---
 .../dimensions/DimensionsConversionContext.java | 116 ---
 .../lib/dimensions/DimensionsDescriptor.java    | 422 ---------
 .../lib/dimensions/DimensionsEvent.java         | 844 ------------------
 .../AbstractIncrementalAggregator.java          | 190 -----
 .../dimensions/aggregator/AggregateEvent.java   |  38 -
 .../aggregator/AggregatorAverage.java           | 146 ----
 .../dimensions/aggregator/AggregatorCount.java  | 128 ---
 .../dimensions/aggregator/AggregatorCumSum.java | 233 -----
 .../dimensions/aggregator/AggregatorFirst.java  |  84 --
 .../aggregator/AggregatorIncrementalType.java   |  79 --
 .../dimensions/aggregator/AggregatorLast.java   |  84 --
 .../dimensions/aggregator/AggregatorMax.java    | 265 ------
 .../dimensions/aggregator/AggregatorMin.java    | 265 ------
 .../aggregator/AggregatorOTFType.java           |  89 --
 .../aggregator/AggregatorRegistry.java          | 424 ----------
 .../dimensions/aggregator/AggregatorSum.java    | 254 ------
 .../dimensions/aggregator/AggregatorUtils.java  | 148 ----
 .../aggregator/IncrementalAggregator.java       |  70 --
 .../dimensions/aggregator/OTFAggregator.java    |  84 --
 .../lib/dimensions/package-info.java            |  20 -
 .../dimensions/CustomTimeBucketRegistry.java    | 139 +++
 .../dimensions/DimensionsConversionContext.java | 116 +++
 .../lib/dimensions/DimensionsDescriptor.java    | 447 ++++++++++
 .../malhar/lib/dimensions/DimensionsEvent.java  | 848 +++++++++++++++++++
 .../AbstractIncrementalAggregator.java          | 191 +++++
 .../dimensions/aggregator/AggregateEvent.java   |  38 +
 .../aggregator/AggregatorAverage.java           | 146 ++++
 .../dimensions/aggregator/AggregatorCount.java  | 129 +++
 .../dimensions/aggregator/AggregatorCumSum.java | 234 +++++
 .../dimensions/aggregator/AggregatorFirst.java  |  85 ++
 .../aggregator/AggregatorIncrementalType.java   |  79 ++
 .../dimensions/aggregator/AggregatorLast.java   |  85 ++
 .../dimensions/aggregator/AggregatorMax.java    | 266 ++++++
 .../dimensions/aggregator/AggregatorMin.java    | 266 ++++++
 .../aggregator/AggregatorOTFType.java           |  89 ++
 .../aggregator/AggregatorRegistry.java          | 424 ++++++++++
 .../dimensions/aggregator/AggregatorSum.java    | 255 ++++++
 .../dimensions/aggregator/AggregatorUtils.java  | 148 ++++
 .../aggregator/IncrementalAggregator.java       |  71 ++
 .../dimensions/aggregator/OTFAggregator.java    |  84 ++
 .../malhar/lib/dimensions/package-info.java     |  20 +
 .../CustomTimeBucketRegistryTest.java           |   4 +-
 .../appdata/dimensions/DimensionsEventTest.java |   7 +-
 .../DimensionalConfigurationSchemaTest.java     |  23 +-
 .../appdata/schemas/DimensionalSchemaTest.java  |  15 +-
 .../CustomTimeBucketRegistryTest.java           |  87 --
 .../dimensions/DimensionsDescriptorTest.java    | 101 ---
 .../CustomTimeBucketRegistryTest.java           |  87 ++
 .../dimensions/DimensionsDescriptorTest.java    | 102 +++
 53 files changed, 4387 insertions(+), 4349 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/com/datatorrent/lib/appdata/schemas/DimensionalConfigurationSchema.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/appdata/schemas/DimensionalConfigurationSchema.java b/library/src/main/java/com/datatorrent/lib/appdata/schemas/DimensionalConfigurationSchema.java
index 28fa119..1e048c4 100644
--- a/library/src/main/java/com/datatorrent/lib/appdata/schemas/DimensionalConfigurationSchema.java
+++ b/library/src/main/java/com/datatorrent/lib/appdata/schemas/DimensionalConfigurationSchema.java
@@ -26,6 +26,12 @@ import java.util.Set;
 
 import javax.validation.constraints.NotNull;
 
+import org.apache.apex.malhar.lib.dimensions.CustomTimeBucketRegistry;
+import org.apache.apex.malhar.lib.dimensions.DimensionsDescriptor;
+import org.apache.apex.malhar.lib.dimensions.aggregator.AggregatorRegistry;
+import org.apache.apex.malhar.lib.dimensions.aggregator.AggregatorUtils;
+import org.apache.apex.malhar.lib.dimensions.aggregator.IncrementalAggregator;
+import org.apache.apex.malhar.lib.dimensions.aggregator.OTFAggregator;
 import org.codehaus.jettison.json.JSONArray;
 import org.codehaus.jettison.json.JSONException;
 import org.codehaus.jettison.json.JSONObject;
@@ -39,13 +45,6 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
-import com.datatorrent.lib.dimensions.CustomTimeBucketRegistry;
-import com.datatorrent.lib.dimensions.DimensionsDescriptor;
-import com.datatorrent.lib.dimensions.aggregator.AggregatorRegistry;
-import com.datatorrent.lib.dimensions.aggregator.AggregatorUtils;
-import com.datatorrent.lib.dimensions.aggregator.IncrementalAggregator;
-import com.datatorrent.lib.dimensions.aggregator.OTFAggregator;
-
 import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
 import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
 import it.unimi.dsi.fastutil.ints.IntArrayList;

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/com/datatorrent/lib/appdata/schemas/DimensionalSchema.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/appdata/schemas/DimensionalSchema.java b/library/src/main/java/com/datatorrent/lib/appdata/schemas/DimensionalSchema.java
index 6639d3a..30f2c1e 100644
--- a/library/src/main/java/com/datatorrent/lib/appdata/schemas/DimensionalSchema.java
+++ b/library/src/main/java/com/datatorrent/lib/appdata/schemas/DimensionalSchema.java
@@ -23,6 +23,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.apex.malhar.lib.dimensions.aggregator.AggregatorRegistry;
+import org.apache.apex.malhar.lib.dimensions.aggregator.IncrementalAggregator;
 import org.codehaus.jettison.json.JSONArray;
 import org.codehaus.jettison.json.JSONException;
 import org.codehaus.jettison.json.JSONObject;
@@ -35,9 +37,6 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
-import com.datatorrent.lib.dimensions.aggregator.AggregatorRegistry;
-import com.datatorrent.lib.dimensions.aggregator.IncrementalAggregator;
-
 /**
  * The {@link DimensionalSchema} class represents the App Data dimensions schema. The App Data dimensions
  * schema is built from two sources: a {@link DimensionalConfigurationSchema} and an optional schema stub. The

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/com/datatorrent/lib/db/jdbc/JDBCDimensionalOutputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/JDBCDimensionalOutputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/JDBCDimensionalOutputOperator.java
index 3021521..353f1b2 100644
--- a/library/src/main/java/com/datatorrent/lib/db/jdbc/JDBCDimensionalOutputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/JDBCDimensionalOutputOperator.java
@@ -28,6 +28,10 @@ import java.util.Map;
 import javax.validation.constraints.Min;
 import javax.validation.constraints.NotNull;
 
+import org.apache.apex.malhar.lib.dimensions.DimensionsDescriptor;
+import org.apache.apex.malhar.lib.dimensions.DimensionsEvent.Aggregate;
+import org.apache.apex.malhar.lib.dimensions.DimensionsEvent.EventKey;
+import org.apache.apex.malhar.lib.dimensions.aggregator.AggregatorRegistry;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -42,10 +46,6 @@ import com.datatorrent.lib.appdata.schemas.DimensionalConfigurationSchema;
 import com.datatorrent.lib.appdata.schemas.FieldsDescriptor;
 import com.datatorrent.lib.appdata.schemas.Type;
 import com.datatorrent.lib.db.AbstractPassThruTransactionableStoreOutputOperator;
-import com.datatorrent.lib.dimensions.DimensionsDescriptor;
-import com.datatorrent.lib.dimensions.DimensionsEvent.Aggregate;
-import com.datatorrent.lib.dimensions.DimensionsEvent.EventKey;
-import com.datatorrent.lib.dimensions.aggregator.AggregatorRegistry;
 
 import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/com/datatorrent/lib/dimensions/CustomTimeBucketRegistry.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/dimensions/CustomTimeBucketRegistry.java b/library/src/main/java/com/datatorrent/lib/dimensions/CustomTimeBucketRegistry.java
deleted file mode 100644
index 0e76509..0000000
--- a/library/src/main/java/com/datatorrent/lib/dimensions/CustomTimeBucketRegistry.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.dimensions;
-
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.Map;
-
-import com.google.common.base.Preconditions;
-
-import com.datatorrent.lib.appdata.schemas.CustomTimeBucket;
-
-import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
-import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
-import it.unimi.dsi.fastutil.objects.Object2IntMap;
-import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
-
-/**
- * @since 3.3.0
- */
-public class CustomTimeBucketRegistry implements Serializable
-{
-  private static final long serialVersionUID = 201509221536L;
-
-  private int currentId;
-
-  private Int2ObjectMap<CustomTimeBucket> idToTimeBucket = new Int2ObjectOpenHashMap<>();
-  private Object2IntMap<CustomTimeBucket> timeBucketToId = new Object2IntOpenHashMap<>();
-  private Map<String, CustomTimeBucket> textToTimeBucket = new HashMap<>();
-
-  public CustomTimeBucketRegistry()
-  {
-  }
-
-  public CustomTimeBucketRegistry(int startingId)
-  {
-    this.currentId = startingId;
-  }
-
-  public CustomTimeBucketRegistry(Int2ObjectMap<CustomTimeBucket> idToTimeBucket)
-  {
-    initialize(idToTimeBucket);
-  }
-
-  public CustomTimeBucketRegistry(Int2ObjectMap<CustomTimeBucket> idToTimeBucket,
-      int startingId)
-  {
-    int tempId = initialize(idToTimeBucket);
-
-    Preconditions.checkArgument(tempId < startingId, "The statingId " + startingId
-        + " must be larger than the largest ID " + tempId
-        + " in the given idToTimeBucket mapping");
-
-    this.idToTimeBucket = Preconditions.checkNotNull(idToTimeBucket);
-    this.currentId = startingId;
-  }
-
-  private int initialize(Int2ObjectMap<CustomTimeBucket> idToTimeBucket)
-  {
-    Preconditions.checkNotNull(idToTimeBucket);
-
-    int tempId = Integer.MIN_VALUE;
-
-    for (int timeBucketId : idToTimeBucket.keySet()) {
-      tempId = Math.max(tempId, timeBucketId);
-      CustomTimeBucket customTimeBucket = idToTimeBucket.get(timeBucketId);
-      textToTimeBucket.put(customTimeBucket.getText(), customTimeBucket);
-      Preconditions.checkNotNull(customTimeBucket);
-      timeBucketToId.put(customTimeBucket, timeBucketId);
-    }
-
-    return tempId;
-  }
-
-  public CustomTimeBucket getTimeBucket(int timeBucketId)
-  {
-    return idToTimeBucket.get(timeBucketId);
-  }
-
-  public Integer getTimeBucketId(CustomTimeBucket timeBucket)
-  {
-    if (!timeBucketToId.containsKey(timeBucket)) {
-      return null;
-    }
-
-    return timeBucketToId.get(timeBucket);
-  }
-
-  public CustomTimeBucket getTimeBucket(String text)
-  {
-    return textToTimeBucket.get(text);
-  }
-
-  public void register(CustomTimeBucket timeBucket)
-  {
-    register(timeBucket, currentId);
-  }
-
-  public void register(CustomTimeBucket timeBucket, int timeBucketId)
-  {
-    if (timeBucketToId.containsKey(timeBucket)) {
-      throw new IllegalArgumentException("The timeBucket " + timeBucket + " is already registered.");
-    }
-
-    if (timeBucketToId.containsValue(timeBucketId)) {
-      throw new IllegalArgumentException("The timeBucketId " + timeBucketId + " is already registered.");
-    }
-
-    idToTimeBucket.put(timeBucketId, timeBucket);
-    timeBucketToId.put(timeBucket, timeBucketId);
-
-    if (timeBucketId >= currentId) {
-      currentId = timeBucketId + 1;
-    }
-
-    textToTimeBucket.put(timeBucket.getText(), timeBucket);
-  }
-
-  @Override
-  public String toString()
-  {
-    return "CustomTimeBucketRegistry{" + "idToTimeBucket=" + idToTimeBucket + '}';
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/com/datatorrent/lib/dimensions/DimensionsConversionContext.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/dimensions/DimensionsConversionContext.java b/library/src/main/java/com/datatorrent/lib/dimensions/DimensionsConversionContext.java
deleted file mode 100644
index dd598ff..0000000
--- a/library/src/main/java/com/datatorrent/lib/dimensions/DimensionsConversionContext.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.dimensions;
-
-import java.io.Serializable;
-
-import com.datatorrent.lib.appdata.gpo.GPOUtils.IndexSubset;
-import com.datatorrent.lib.appdata.schemas.FieldsDescriptor;
-
-/**
- * This is a context object used to convert {@link InputEvent}s into aggregates
- * in {@link IncrementalAggregator}s.
- *
- * @since 3.3.0
- */
-public class DimensionsConversionContext implements Serializable
-{
-  private static final long serialVersionUID = 201506151157L;
-
-  public CustomTimeBucketRegistry customTimeBucketRegistry;
-  /**
-   * The schema ID for {@link Aggregate}s emitted by the
-   * {@link com.datatorrent.lib.dimensions.aggregator.IncrementalAggregator}s
-   * holding this context.
-   */
-  public int schemaID;
-  /**
-   * The dimensionsDescriptor ID for {@link Aggregate}s emitted by the
-   * {@link com.datatorrent.lib.dimensions.aggregator.IncrementalAggregator}s
-   * holding this context.
-   */
-  public int dimensionsDescriptorID;
-  /**
-   * The aggregator ID for {@link Aggregate}s emitted by the
-   * {@link com.datatorrent.lib.dimensions.aggregator.IncrementalAggregator}s
-   * holding this context.
-   */
-  public int aggregatorID;
-  /**
-   * The {@link DimensionsDescriptor} corresponding to the given dimension
-   * descriptor id.
-   */
-  public DimensionsDescriptor dd;
-  /**
-   * The {@link FieldsDescriptor} for the aggregate of the {@link Aggregate}s
-   * emitted by the
-   * {@link com.datatorrent.lib.dimensions.aggregator.IncrementalAggregator}s
-   * holding this context object.
-   */
-  public FieldsDescriptor aggregateDescriptor;
-  /**
-   * The {@link FieldsDescriptor} for the key of the {@link Aggregate}s emitted
-   * by the
-   * {@link com.datatorrent.lib.dimensions.aggregator.IncrementalAggregator}s
-   * holding this context object.
-   */
-  public FieldsDescriptor keyDescriptor;
-  /**
-   * The index of the timestamp field within the key of {@link InputEvent}s
-   * received by the
-   * {@link com.datatorrent.lib.dimensions.aggregator.IncrementalAggregator}s
-   * holding this context object. This is -1 if the {@link InputEvent} key has
-   * no timestamp.
-   */
-  public int inputTimestampIndex;
-  /**
-   * The index of the timestamp field within the key of {@link Aggregate}s
-   * emitted by the
-   * {@link com.datatorrent.lib.dimensions.aggregator.IncrementalAggregator}s
-   * holding this context object. This is -1 if the {@link Aggregate}'s key has
-   * no timestamp.
-   */
-  public int outputTimestampIndex;
-  /**
-   * The index of the time bucket field within the key of {@link Aggregate}s
-   * emitted by the
-   * {@link com.datatorrent.lib.dimensions.aggregator.IncrementalAggregator}s
-   * holding this context object. This is -1 if the {@link Aggregate}'s key has
-   * no timebucket.
-   */
-  public int outputTimebucketIndex;
-  /**
-   * The {@link IndexSubset} object that is used to extract key values from
-   * {@link InputEvent}s received by this aggregator.
-   */
-  public IndexSubset indexSubsetKeys;
-  /**
-   * The {@link IndexSubset} object that is used to extract aggregate values
-   * from {@link InputEvent}s received by this aggregator.
-   */
-  public IndexSubset indexSubsetAggregates;
-
-  /**
-   * Constructor for creating conversion context.
-   */
-  public DimensionsConversionContext()
-  {
-    //Do nothing.
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/com/datatorrent/lib/dimensions/DimensionsDescriptor.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/dimensions/DimensionsDescriptor.java b/library/src/main/java/com/datatorrent/lib/dimensions/DimensionsDescriptor.java
deleted file mode 100644
index a1b6f96..0000000
--- a/library/src/main/java/com/datatorrent/lib/dimensions/DimensionsDescriptor.java
+++ /dev/null
@@ -1,422 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.dimensions;
-
-import java.io.Serializable;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
-import com.datatorrent.lib.appdata.schemas.CustomTimeBucket;
-import com.datatorrent.lib.appdata.schemas.Fields;
-import com.datatorrent.lib.appdata.schemas.FieldsDescriptor;
-import com.datatorrent.lib.appdata.schemas.TimeBucket;
-import com.datatorrent.lib.appdata.schemas.Type;
-
-/**
- * <p>
- * This class defines a dimensions combination which is used by dimensions computation operators
- * and stores. A dimension combination is composed of the names of the fields that constitute the key,
- * as well as the TimeBucket under which data is stored.
- * </p>
- * <p>
- * This class supports the creation of a dimensions combination from a {@link TimeBucket} object and a set of fields.
- * It also supports the creation of a dimensions combination an aggregation string. An aggregation string looks like
- * the following:
- * <br/>
- * <br/>
- * {@code
- * "time=MINUTES:publisher:advertiser"
- * }
- * <br/>
- * <br/>
- * In the example above <b>"time=MINUTES"</b> represents a time bucket, and the other colon separated strings represent
- * the name of fields which comprise the key for this dimension combination. When specifiying a time bucket in an
- * aggregation string you must use the name of one of the TimeUnit enums.
- * </p>
- * <p>
- * One of the primary uses of a {@link DimensionsDescriptor} is for querying a dimensional data store. When a query is
- * received for a dimensional data store, the query must be mapped to many things including a dimensionDescriptorID. The
- * dimensionDescriptorID is an id assigned to a class of dimension combinations which share the same keys. This
- * mapping is
- * performed by creating a
- * {@link DimensionsDescriptor} object from the query, and then using the {@link DimensionsDescriptor} object
- * to look up the correct dimensionsDescriptorID. This lookup to retrieve a dimensionsDescriptorID is necessary
- * because a
- * dimensionsDescriptorID is used for storage in order to prevent key conflicts.
- * </p>
- *
- *
- * @since 3.3.0
- */
-public class DimensionsDescriptor implements Serializable, Comparable<DimensionsDescriptor>
-{
-  private static final long serialVersionUID = 201506251237L;
-
-  /**
-   * Name of the reserved time field.
-   */
-  public static final String DIMENSION_TIME = "time";
-  /**
-   * Type of the reserved time field.
-   */
-  public static final Type DIMENSION_TIME_TYPE = Type.LONG;
-  /**
-   * Name of the reserved time bucket field.
-   */
-  public static final String DIMENSION_TIME_BUCKET = "timeBucket";
-  /**
-   * Type of the reserved time bucket field.
-   */
-  public static final Type DIMENSION_TIME_BUCKET_TYPE = Type.INTEGER;
-  /**
-   * The set of fields used for time, which are intended to be queried. Not that the
-   * timeBucket field is not included here because its not intended to be queried.
-   */
-  public static final Fields TIME_FIELDS = new Fields(Sets.newHashSet(DIMENSION_TIME));
-  /**
-   * This set represents the field names which cannot be part of the user defined field names in a schema for
-   * dimensions computation.
-   */
-  public static final Set<String> RESERVED_DIMENSION_NAMES = ImmutableSet.of(DIMENSION_TIME,
-      DIMENSION_TIME_BUCKET);
-  /**
-   * This is the equals string separator used when defining a time bucket for a dimensions combination.
-   */
-  public static final String DELIMETER_EQUALS = "=";
-  /**
-   * This separates dimensions in the dimensions combination.
-   */
-  public static final String DELIMETER_SEPERATOR = ":";
-  /**
-   * A map from a key field to its type.
-   */
-  public static final Map<String, Type> DIMENSION_FIELD_TO_TYPE;
-
-  /**
-   * The time bucket used for this dimension combination.
-   */
-  private TimeBucket timeBucket;
-  /**
-   * The custom time bucket used for this dimension combination.
-   */
-  private CustomTimeBucket customTimeBucket;
-  /**
-   * The set of key fields which compose this dimension combination.
-   */
-  private Fields fields;
-
-  static {
-    Map<String, Type> dimensionFieldToType = Maps.newHashMap();
-
-    dimensionFieldToType.put(DIMENSION_TIME, DIMENSION_TIME_TYPE);
-    dimensionFieldToType.put(DIMENSION_TIME_BUCKET, DIMENSION_TIME_BUCKET_TYPE);
-
-    DIMENSION_FIELD_TO_TYPE = Collections.unmodifiableMap(dimensionFieldToType);
-  }
-
-  /**
-   * Constructor for kryo serialization.
-   */
-  private DimensionsDescriptor()
-  {
-    //for kryo
-  }
-
-  /**
-   * Creates a dimensions descriptor (dimensions combination) with the given {@link TimeBucket} and key fields.
-   *
-   * @param timeBucket The {@link TimeBucket} that this dimensions combination represents.
-   * @param fields     The key fields included in this dimensions combination.
-   * @deprecated use
-   * {@link #DimensionsDescriptor(com.datatorrent.lib.appdata.schemas.CustomTimeBucket,
-   * com.datatorrent.lib.appdata.schemas.Fields)} instead.
-   */
-  @Deprecated
-  public DimensionsDescriptor(TimeBucket timeBucket,
-      Fields fields)
-  {
-    setTimeBucket(timeBucket);
-    setFields(fields);
-  }
-
-  /**
-   * Creates a dimensions descriptor (dimensions combination) with the given {@link CustomTimeBucket} and key fields.
-   *
-   * @param timeBucket The {@link CustomTimeBucket} that this dimensions combination represents.
-   * @param fields     The key fields included in this dimensions combination.
-   */
-  public DimensionsDescriptor(CustomTimeBucket timeBucket,
-      Fields fields)
-  {
-    setCustomTimeBucket(timeBucket);
-    setFields(fields);
-  }
-
-  /**
-   * Creates a dimensions descriptor (dimensions combination) with the given key fields.
-   *
-   * @param fields The key fields included in this dimensions combination.
-   */
-  public DimensionsDescriptor(Fields fields)
-  {
-    setFields(fields);
-  }
-
-  /**
-   * This construction creates a dimensions descriptor (dimensions combination) from the given aggregation string.
-   *
-   * @param aggregationString The aggregation string to use when initializing this dimensions combination.
-   */
-  public DimensionsDescriptor(String aggregationString)
-  {
-    initialize(aggregationString);
-  }
-
-  /**
-   * Initializes the dimensions combination with the given aggregation string.
-   *
-   * @param aggregationString The aggregation string with which to initialize this dimensions combination.
-   */
-  private void initialize(String aggregationString)
-  {
-    String[] fieldArray = aggregationString.split(DELIMETER_SEPERATOR);
-    Set<String> fieldSet = Sets.newHashSet();
-
-    for (String field : fieldArray) {
-      String[] fieldAndValue = field.split(DELIMETER_EQUALS);
-      String fieldName = fieldAndValue[0];
-
-      if (fieldName.equals(DIMENSION_TIME_BUCKET)) {
-        throw new IllegalArgumentException(DIMENSION_TIME_BUCKET + " is an invalid time.");
-      }
-
-      if (!fieldName.equals(DIMENSION_TIME)) {
-        fieldSet.add(fieldName);
-      }
-
-      if (fieldName.equals(DIMENSION_TIME)) {
-        if (timeBucket != null) {
-          throw new IllegalArgumentException("Cannot specify time in a dimensions "
-              + "descriptor when a timebucket is also "
-              + "specified.");
-        }
-
-        if (fieldAndValue.length == 2) {
-
-          timeBucket = TimeBucket.TIME_UNIT_TO_TIME_BUCKET.get(TimeUnit.valueOf(fieldAndValue[1]));
-        }
-      }
-    }
-
-    fields = new Fields(fieldSet);
-  }
-
-  /**
-   * This is a helper method which sets and validates the {@link TimeBucket}.
-   *
-   * @param timeBucket The {@link TimeBucket} to set and validate.
-   */
-  private void setTimeBucket(TimeBucket timeBucket)
-  {
-    Preconditions.checkNotNull(timeBucket);
-    this.timeBucket = timeBucket;
-    this.customTimeBucket = new CustomTimeBucket(timeBucket);
-  }
-
-  /**
-   * This is a helper method which sets and validates the {@link CustomTimeBucket}.
-   *
-   * @param customTimeBucket The {@link CustomTimeBucket} to set and validate.
-   */
-  private void setCustomTimeBucket(CustomTimeBucket customTimeBucket)
-  {
-    Preconditions.checkNotNull(customTimeBucket);
-    this.customTimeBucket = customTimeBucket;
-    this.timeBucket = customTimeBucket.getTimeBucket();
-  }
-
-  /**
-   * Gets the {@link TimeBucket} for this {@link DimensionsDescriptor} object.
-   *
-   * @return The {@link TimeBucket} for this {@link DimensionsDescriptor} object.
-   * @deprecated use {@link #getCustomTimeBucket()} instead.
-   */
-  @Deprecated
-  public TimeBucket getTimeBucket()
-  {
-    return timeBucket;
-  }
-
-  /**
-   * Gets the {@link CustomTimeBucket} for this {@link DimensionsDescriptor} object.
-   *
-   * @return The {@link CustomTimeBucket} for this {@link DimensionsDescriptor} object.
-   */
-  public CustomTimeBucket getCustomTimeBucket()
-  {
-    return customTimeBucket;
-  }
-
-  /**
-   * This is a helper method which sets and validates the set of key fields for this
-   * {@link DimensionsDescriptor} object.
-   *
-   * @param fields The set of key fields for this {@link DimensionsDescriptor} object.
-   */
-  private void setFields(Fields fields)
-  {
-    Preconditions.checkNotNull(fields);
-    this.fields = fields;
-  }
-
-  /**
-   * Returns the set of key fields for this {@link DimensionsDescriptor} object.
-   *
-   * @return The set of key fields for this {@link DimensionsDescriptor} object.
-   */
-  public Fields getFields()
-  {
-    return fields;
-  }
-
-  /**
-   * This method is used to create a new {@link FieldsDescriptor} object representing this
-   * {@link DimensionsDescriptor} object from another {@link FieldsDescriptor} object which
-   * defines the names and types of all the available key fields.
-   *
-   * @param parentDescriptor The {@link FieldsDescriptor} object which defines the name and
-   *                         type of all the available key fields.
-   * @return A {@link FieldsDescriptor} object which represents this {@link DimensionsDescriptor} (dimensions
-   * combination)
-   * derived from the given {@link FieldsDescriptor} object.
-   */
-  public FieldsDescriptor createFieldsDescriptor(FieldsDescriptor parentDescriptor)
-  {
-    Map<String, Type> fieldToType = Maps.newHashMap();
-    Map<String, Type> parentFieldToType = parentDescriptor.getFieldToType();
-
-    for (String field : this.fields.getFields()) {
-      if (RESERVED_DIMENSION_NAMES.contains(field)) {
-        continue;
-      }
-
-      fieldToType.put(field, parentFieldToType.get(field));
-    }
-
-    if (timeBucket != null && timeBucket != TimeBucket.ALL) {
-      fieldToType.put(DIMENSION_TIME_BUCKET, DIMENSION_TIME_BUCKET_TYPE);
-      fieldToType.put(DIMENSION_TIME, Type.LONG);
-    }
-
-    return new FieldsDescriptor(fieldToType);
-  }
-
-  @Override
-  public int hashCode()
-  {
-    int hash = 7;
-    hash = 83 * hash + (this.customTimeBucket != null ? this.customTimeBucket.hashCode() : 0);
-    hash = 83 * hash + (this.fields != null ? this.fields.hashCode() : 0);
-    return hash;
-  }
-
-  @Override
-  public boolean equals(Object obj)
-  {
-    if (obj == null) {
-      return false;
-    }
-    if (getClass() != obj.getClass()) {
-      return false;
-    }
-    final DimensionsDescriptor other = (DimensionsDescriptor)obj;
-    if (!this.customTimeBucket.equals(other.customTimeBucket)) {
-      return false;
-    }
-    if (this.fields != other.fields && (this.fields == null || !this.fields.equals(other.fields))) {
-      return false;
-    }
-    return true;
-  }
-
-  @Override
-  public String toString()
-  {
-    return "DimensionsDescriptor{" + "timeBucket=" + customTimeBucket + ", fields=" + fields + '}';
-  }
-
-  @Override
-  public int compareTo(DimensionsDescriptor other)
-  {
-    if (this == other) {
-      return 0;
-    }
-
-    List<String> thisFieldList = this.getFields().getFieldsList();
-    List<String> otherFieldList = other.getFields().getFieldsList();
-
-    if (thisFieldList != otherFieldList) {
-      int compare = thisFieldList.size() - otherFieldList.size();
-
-      if (compare != 0) {
-        return compare;
-      }
-
-      Collections.sort(thisFieldList);
-      Collections.sort(otherFieldList);
-
-      for (int index = 0; index < thisFieldList.size(); index++) {
-        String thisField = thisFieldList.get(index);
-        String otherField = otherFieldList.get(index);
-
-        int fieldCompare = thisField.compareTo(otherField);
-
-        if (fieldCompare != 0) {
-          return fieldCompare;
-        }
-      }
-    }
-
-    CustomTimeBucket thisBucket = this.getCustomTimeBucket();
-    CustomTimeBucket otherBucket = other.getCustomTimeBucket();
-
-    if (thisBucket == null && otherBucket == null) {
-      return 0;
-    } else if (thisBucket != null && otherBucket == null) {
-      return 1;
-    } else if (thisBucket == null && otherBucket != null) {
-      return -1;
-    } else {
-      return thisBucket.compareTo(otherBucket);
-    }
-  }
-
-  private static final Logger LOG = LoggerFactory.getLogger(DimensionsDescriptor.class);
-}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/com/datatorrent/lib/dimensions/DimensionsEvent.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/dimensions/DimensionsEvent.java b/library/src/main/java/com/datatorrent/lib/dimensions/DimensionsEvent.java
deleted file mode 100644
index b12b631..0000000
--- a/library/src/main/java/com/datatorrent/lib/dimensions/DimensionsEvent.java
+++ /dev/null
@@ -1,844 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.dimensions;
-
-import java.io.Serializable;
-import java.util.List;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-import com.datatorrent.lib.appdata.gpo.GPOMutable;
-import com.datatorrent.lib.appdata.gpo.GPOUtils;
-import com.datatorrent.lib.dimensions.aggregator.AggregateEvent;
-
-/**
- * <p>
- * This is the base class for the events that are used for internal processing in the subclasses of
- * {@link AbstractDimensionsComputationFlexible} and {@link DimensionsStoreHDHT}.
- * </p>
- * <p>
- * A {@link DimensionsEvent} is constructed from two parts: an {@link EventKey} and a {@link GPOMutable} object
- * which contains the values of aggregate fields. The {@link EventKey} is used to identify the dimension combination
- * an event belongs to, and consequently determines what input values should be aggregated together. The aggregates
- * are the actual data payload of the event which are to be aggregated.
- * </p>
- *
- * @since 3.1.0
- */
-public class DimensionsEvent implements Serializable
-{
-  private static final long serialVersionUID = 201503231204L;
-
-  /**
-   * This is the {@link GPOMutable} object which holds all the aggregates.
-   */
-  protected GPOMutable aggregates;
-  /**
-   * This is the event key for the event.
-   */
-  protected EventKey eventKey;
-
-  /**
-   * Constructor for Kryo.
-   */
-  private DimensionsEvent()
-  {
-    //For kryo
-  }
-
-  /**
-   * This creates a {@link DimensionsEvent} from the given event key and aggregates.
-   *
-   * @param eventKey   The key from which to create a {@link DimensionsEvent}.
-   * @param aggregates The aggregates from which to create {@link DimensionsEvent}.
-   */
-  public DimensionsEvent(EventKey eventKey,
-      GPOMutable aggregates)
-  {
-    setEventKey(eventKey);
-    setAggregates(aggregates);
-  }
-
-  /**
-   * Creates a DimensionsEvent with the given key values, aggregates and ids.
-   *
-   * @param keys                  The values for fields in the key.
-   * @param aggregates            The values for fields in the aggregate.
-   * @param bucketID              The bucketID
-   * @param schemaID              The schemaID.
-   * @param dimensionDescriptorID The dimensionsDescriptorID.
-   * @param aggregatorIndex       The aggregatorIndex assigned to this event by the unifier.
-   */
-  public DimensionsEvent(GPOMutable keys,
-      GPOMutable aggregates,
-      int bucketID,
-      int schemaID,
-      int dimensionDescriptorID,
-      int aggregatorIndex)
-  {
-    this.eventKey = new EventKey(bucketID,
-        schemaID,
-        dimensionDescriptorID,
-        aggregatorIndex,
-        keys);
-    setAggregates(aggregates);
-  }
-
-  /**
-   * This creates an event with the given data. Note, this constructor assumes that the bucketID will be 0.
-   *
-   * @param keys                  The value for fields in the key.
-   * @param aggregates            The value for fields in the aggregate.
-   * @param schemaID              The schemaID.
-   * @param dimensionDescriptorID The dimensionsDescriptorID.
-   * @param aggregatorIndex       The aggregatorIndex assigned to this event by the unifier.
-   */
-  public DimensionsEvent(GPOMutable keys,
-      GPOMutable aggregates,
-      int schemaID,
-      int dimensionDescriptorID,
-      int aggregatorIndex)
-  {
-    this.eventKey = new EventKey(schemaID,
-        dimensionDescriptorID,
-        aggregatorIndex,
-        keys);
-    setAggregates(aggregates);
-  }
-
-  /**
-   * This is a helper method which sets the {@link EventKey} of the event to
-   * be the same as the given {@link EventKey}.
-   *
-   * @param eventKey The {@link EventKey} to set on this event.
-   */
-  protected final void setEventKey(EventKey eventKey)
-  {
-    this.eventKey = new EventKey(eventKey);
-  }
-
-  /**
-   * This is a helper method which sets the aggregates for this event.
-   *
-   * @param aggregates The aggregates for this event.
-   */
-  protected final void setAggregates(GPOMutable aggregates)
-  {
-    Preconditions.checkNotNull(aggregates);
-    this.aggregates = aggregates;
-  }
-
-  /**
-   * This is a helper method which returns the aggregates for this event.
-   *
-   * @return The helper method which returns the aggregates for this event.
-   */
-  public GPOMutable getAggregates()
-  {
-    return aggregates;
-  }
-
-  /**
-   * Returns the {@link EventKey} for this event.
-   *
-   * @return The {@link EventKey} for this event.
-   */
-  public EventKey getEventKey()
-  {
-    return eventKey;
-  }
-
-  /**
-   * This is a convenience method which returns the values of the key fields in this event's
-   * {@link EventKey}.
-   *
-   * @return The values of the key fields in this event's {@link EventKey}.
-   */
-  public GPOMutable getKeys()
-  {
-    return eventKey.getKey();
-  }
-
-  /**
-   * This is a convenience method which returns the schemaID of this event's {@link EventKey}.
-   *
-   * @return The schemaID of this event's {@link EventKey}.
-   */
-  public int getSchemaID()
-  {
-    return eventKey.getSchemaID();
-  }
-
-  /**
-   * Returns the id of the dimension descriptor (key combination) for which this event contains data.
-   *
-   * @return The id of the dimension descriptor (key combination) for which this event contains data.
-   */
-  public int getDimensionDescriptorID()
-  {
-    return eventKey.getDimensionDescriptorID();
-  }
-
-  /**
-   * Returns the id of the aggregator which is applied to this event's data.
-   *
-   * @return Returns the id of the aggregator which is applied to this event's data.
-   */
-  public int getAggregatorID()
-  {
-    return eventKey.getAggregatorID();
-  }
-
-  /**
-   * Returns the bucketID assigned to this event. The bucketID is useful for this event in the case that the event
-   * is sent to a partitioned HDHT operator. Each partitioned HDHT operator can use the bucketIDs for the buckets it
-   * writes to as a partition key.
-   *
-   * @return The bucketID assigned to this event.
-   */
-  public int getBucketID()
-  {
-    return eventKey.getBucketID();
-  }
-
-  /**
-   * This is a utility method which copies the given src event to the given destination event.
-   *
-   * @param aeDest The destination event.
-   * @param aeSrc  The source event.
-   */
-  public static void copy(DimensionsEvent aeDest, DimensionsEvent aeSrc)
-  {
-    GPOMutable destAggs = aeDest.getAggregates();
-    GPOMutable srcAggs = aeSrc.getAggregates();
-
-    if (srcAggs.getFieldsBoolean() != null) {
-      System.arraycopy(srcAggs.getFieldsBoolean(), 0, destAggs.getFieldsBoolean(), 0,
-          srcAggs.getFieldsBoolean().length);
-    }
-
-    if (srcAggs.getFieldsCharacter() != null) {
-      System.arraycopy(srcAggs.getFieldsCharacter(), 0, destAggs.getFieldsCharacter(), 0,
-          srcAggs.getFieldsCharacter().length);
-    }
-
-    if (srcAggs.getFieldsString() != null) {
-      System.arraycopy(srcAggs.getFieldsString(), 0, destAggs.getFieldsString(), 0, srcAggs.getFieldsString().length);
-    }
-
-    if (srcAggs.getFieldsShort() != null) {
-      System.arraycopy(srcAggs.getFieldsShort(), 0, destAggs.getFieldsShort(), 0, srcAggs.getFieldsShort().length);
-    }
-
-    if (srcAggs.getFieldsInteger() != null) {
-      System.arraycopy(srcAggs.getFieldsInteger(), 0, destAggs.getFieldsInteger(), 0,
-          srcAggs.getFieldsInteger().length);
-    }
-
-    if (srcAggs.getFieldsLong() != null) {
-      System.arraycopy(srcAggs.getFieldsLong(), 0, destAggs.getFieldsLong(), 0, srcAggs.getFieldsLong().length);
-    }
-
-    if (srcAggs.getFieldsFloat() != null) {
-      System.arraycopy(srcAggs.getFieldsFloat(), 0, destAggs.getFieldsFloat(), 0, srcAggs.getFieldsFloat().length);
-    }
-
-    if (srcAggs.getFieldsDouble() != null) {
-      System.arraycopy(srcAggs.getFieldsDouble(), 0, destAggs.getFieldsDouble(), 0, srcAggs.getFieldsDouble().length);
-    }
-  }
-
-  /**
-   * <p>
-   * The {@link EventKey} represents a dimensions combination for a dimensions event. It contains the keys and values
-   * which define a dimensions combination. It's very similar to a {@link DimensionsDescriptor} which is also used to
-   * define part of a dimensions combination. The difference between the two is that a {@link DimensionsDescriptor}
-   * only contains what
-   * keys are included in the combination, not the values of those keys (which the {@link EventKey} has.
-   * </p>
-   * <p>
-   * In addition to holding the keys in a dimensions combination and their values, the event key holds some meta
-   * information.
-   * The meta information included and their purposes are the following:
-   * <ul>
-   * <li><b>bucketID:</b> This is set when the dimension store responsible for storing the data is partitioned. In that
-   * case the bucketID is used as the partitionID.</li>
-   * <li><b>schemaID:</b> This is the id of the {@link DimensionalSchema} that this {@link EventKey} corresponds to
-   * .</li>
-   * <li><b>dimensionDescriptorID:</b> This is the id of the {@link DimensionsDescriptor} that this {@link EventKey}
-   * corresponds to.</li>
-   * <li><b>aggregatorID:</b> This is the id of the aggregator that is used to aggregate the values associated with
-   * this {@link EventKey}
-   * in a {@link DimensionsEvent}.</li>
-   * </ul>
-   * </p>
-   */
-  public static class EventKey implements Serializable
-  {
-    private static final long serialVersionUID = 201503231205L;
-
-    /**
-     * The bucketID assigned to this event key.
-     */
-    private int bucketID;
-    /**
-     * The schemaID corresponding to the {@link DimensionalSchema} that this {@link EventKey}
-     * corresponds to.
-     */
-    private int schemaID;
-    /**
-     * The dimensionsDescriptorID of the {@link DimensionDescriptor} in the corresponding {@link DimensionalSchema}.
-     */
-    private int dimensionDescriptorID;
-    /**
-     * The id of the aggregator which should be used to aggregate the values corresponding to this
-     * {@link EventKey}.
-     */
-    private int aggregatorID;
-    /**
-     * The values of the key fields.
-     */
-    private GPOMutable key;
-
-    /**
-     * Constructor for serialization.
-     */
-    private EventKey()
-    {
-      //For kryo
-    }
-
-    /**
-     * Copy constructor.
-     *
-     * @param eventKey The {@link EventKey} whose data will be copied.
-     */
-    public EventKey(EventKey eventKey)
-    {
-      this.bucketID = eventKey.bucketID;
-      this.schemaID = eventKey.schemaID;
-      this.dimensionDescriptorID = eventKey.dimensionDescriptorID;
-      this.aggregatorID = eventKey.aggregatorID;
-
-      this.key = new GPOMutable(eventKey.getKey());
-    }
-
-    /**
-     * Creates an event key with the given data.
-     *
-     * @param bucketID              The bucketID assigned to this {@link EventKey}.
-     * @param schemaID              The schemaID of the corresponding {@link DimensionalSchema}.
-     * @param dimensionDescriptorID The dimensionDescriptorID of the corresponding
-     * {@link DimensionDescriptor} in the {@link DimensionalSchema}.
-     * @param aggregatorID          The id of the aggregator which should be used to aggregate the values
-     *                              corresponding to this
-     *                              {@link EventKey}.
-     * @param key                   The values of the keys.
-     */
-    public EventKey(int bucketID,
-        int schemaID,
-        int dimensionDescriptorID,
-        int aggregatorID,
-        GPOMutable key)
-    {
-      setBucketID(bucketID);
-      setSchemaID(schemaID);
-      setDimensionDescriptorID(dimensionDescriptorID);
-      setAggregatorID(aggregatorID);
-      setKey(key);
-    }
-
-    /**
-     * Creates an event key with the given data. This constructor assumes that the bucketID will be 0.
-     *
-     * @param schemaID              The schemaID of the corresponding {@link DimensionalSchema}.
-     * @param dimensionDescriptorID The dimensionDescriptorID of the corresponding {@link DimensionDescriptor}.
-     * @param aggregatorID          The id of the aggregator which should be used to aggregate the values
-     *                              corresponding to this
-     *                              {@link EventKey}.
-     * @param key                   The values of the keys.
-     */
-    public EventKey(int schemaID,
-        int dimensionDescriptorID,
-        int aggregatorID,
-        GPOMutable key)
-    {
-      setSchemaID(schemaID);
-      setDimensionDescriptorID(dimensionDescriptorID);
-      setAggregatorID(aggregatorID);
-      setKey(key);
-    }
-
-    /**
-     * Sets the dimension descriptor ID.
-     *
-     * @param dimensionDescriptorID The dimension descriptor ID to set.
-     */
-    private void setDimensionDescriptorID(int dimensionDescriptorID)
-    {
-      this.dimensionDescriptorID = dimensionDescriptorID;
-    }
-
-    /**
-     * Returns the dimension descriptor ID.
-     *
-     * @return The dimension descriptor ID.
-     */
-    public int getDimensionDescriptorID()
-    {
-      return dimensionDescriptorID;
-    }
-
-    /**
-     * Returns the aggregatorID.
-     *
-     * @return The aggregatorID.
-     */
-    public int getAggregatorID()
-    {
-      return aggregatorID;
-    }
-
-    /**
-     * Sets the aggregatorID.
-     *
-     * @param aggregatorID The aggregatorID to set.
-     */
-    private void setAggregatorID(int aggregatorID)
-    {
-      this.aggregatorID = aggregatorID;
-    }
-
-    /**
-     * Returns the schemaID.
-     *
-     * @return The schemaID to set.
-     */
-    public int getSchemaID()
-    {
-      return schemaID;
-    }
-
-    /**
-     * Sets the schemaID.
-     *
-     * @param schemaID The schemaID to set.
-     */
-    private void setSchemaID(int schemaID)
-    {
-      this.schemaID = schemaID;
-    }
-
-    /**
-     * Returns the key values.
-     *
-     * @return The key values.
-     */
-    public GPOMutable getKey()
-    {
-      return key;
-    }
-
-    /**
-     * Sets the bucektID.
-     *
-     * @param bucketID The bucketID.
-     */
-    private void setBucketID(int bucketID)
-    {
-      this.bucketID = bucketID;
-    }
-
-    /**
-     * Gets the bucketID.
-     *
-     * @return The bucketID.
-     */
-    public int getBucketID()
-    {
-      return bucketID;
-    }
-
-    /**
-     * Sets the key values.
-     *
-     * @param key The key values to set.
-     */
-    private void setKey(GPOMutable key)
-    {
-      Preconditions.checkNotNull(key);
-      this.key = key;
-    }
-
-    @Override
-    public int hashCode()
-    {
-      int hash = 3;
-      hash = 97 * hash + this.bucketID;
-      hash = 97 * hash + this.schemaID;
-      hash = 97 * hash + this.dimensionDescriptorID;
-      hash = 97 * hash + this.aggregatorID;
-      hash = 97 * hash + (this.key != null ? this.key.hashCode() : 0);
-      return hash;
-    }
-
-    @Override
-    public boolean equals(Object obj)
-    {
-      if (obj == null) {
-        return false;
-      }
-
-      if (getClass() != obj.getClass()) {
-        return false;
-      }
-
-      final EventKey other = (EventKey)obj;
-
-      if (this.bucketID != other.bucketID) {
-        return false;
-      }
-
-      if (this.schemaID != other.schemaID) {
-        return false;
-      }
-
-      if (this.dimensionDescriptorID != other.dimensionDescriptorID) {
-        return false;
-      }
-
-      if (this.aggregatorID != other.aggregatorID) {
-        return false;
-      }
-
-      if (this.key != other.key && (this.key == null || !this.key.equals(other.key))) {
-        return false;
-      }
-
-      return true;
-    }
-
-    @Override
-    public String toString()
-    {
-      return "EventKey{" + "schemaID=" + schemaID + ", dimensionDescriptorID=" + dimensionDescriptorID +
-          ", aggregatorIndex=" + aggregatorID + ", key=" + key + '}';
-    }
-
-    public static List<EventKey> createEventKeys(int schemaId,
-        int dimensionsDescriptorId,
-        int aggregatorId,
-        List<GPOMutable> keys)
-    {
-      List<EventKey> eventKeys = Lists.newArrayList();
-
-      for (GPOMutable key : keys) {
-        eventKeys.add(new EventKey(schemaId, dimensionsDescriptorId, aggregatorId, key));
-      }
-
-      return eventKeys;
-    }
-  }
-
-  @Override
-  public int hashCode()
-  {
-    int hash = 5;
-    hash = 79 * hash + (this.aggregates != null ? this.aggregates.hashCode() : 0);
-    hash = 79 * hash + (this.eventKey != null ? this.eventKey.hashCode() : 0);
-    return hash;
-  }
-
-  @Override
-  public boolean equals(Object obj)
-  {
-    if (obj == null) {
-      return false;
-    }
-    if (getClass() != obj.getClass()) {
-      return false;
-    }
-    final DimensionsEvent other = (DimensionsEvent)obj;
-    if (this.aggregates != other.aggregates && (this.aggregates == null || !this.aggregates.equals(other.aggregates))) {
-      return false;
-    }
-    if (this.eventKey != other.eventKey && (this.eventKey == null || !this.eventKey.equals(other.eventKey))) {
-      return false;
-    }
-    return true;
-  }
-
-  public static class InputEvent extends DimensionsEvent
-  {
-    private static final long serialVersionUID = 201506210406L;
-    public boolean used = false;
-
-    private InputEvent()
-    {
-    }
-
-    /**
-     * This creates a {@link DimensionsEvent} from the given event key and aggregates.
-     *
-     * @param eventKey   The key from which to create a {@link DimensionsEvent}.
-     * @param aggregates The aggregates from which to create {@link DimensionsEvent}.
-     */
-    public InputEvent(EventKey eventKey,
-        GPOMutable aggregates)
-    {
-      setEventKey(eventKey);
-      setAggregates(aggregates);
-    }
-
-    /**
-     * Creates a DimensionsEvent with the given key values, aggregates and ids.
-     *
-     * @param keys                  The values for fields in the key.
-     * @param aggregates            The values for fields in the aggregate.
-     * @param bucketID              The bucketID
-     * @param schemaID              The schemaID.
-     * @param dimensionDescriptorID The dimensionsDescriptorID.
-     * @param aggregatorIndex       The aggregatorIndex assigned to this event by the unifier.
-     */
-    public InputEvent(GPOMutable keys,
-        GPOMutable aggregates,
-        int bucketID,
-        int schemaID,
-        int dimensionDescriptorID,
-        int aggregatorIndex)
-    {
-      this.eventKey = new EventKey(bucketID,
-          schemaID,
-          dimensionDescriptorID,
-          aggregatorIndex,
-          keys);
-      setAggregates(aggregates);
-    }
-
-    /**
-     * This creates an event with the given data. Note, this constructor assumes that the bucketID will be 0.
-     *
-     * @param keys                  The value for fields in the key.
-     * @param aggregates            The value for fields in the aggregate.
-     * @param schemaID              The schemaID.
-     * @param dimensionDescriptorID The dimensionsDescriptorID.
-     * @param aggregatorIndex       The aggregatorIndex assigned to this event by the unifier.
-     */
-    public InputEvent(GPOMutable keys,
-        GPOMutable aggregates,
-        int schemaID,
-        int dimensionDescriptorID,
-        int aggregatorIndex)
-    {
-      this.eventKey = new EventKey(schemaID,
-          dimensionDescriptorID,
-          aggregatorIndex,
-          keys);
-      setAggregates(aggregates);
-    }
-
-    @Override
-    public int hashCode()
-    {
-      return GPOUtils.hashcode(this.getKeys());
-    }
-
-    @Override
-    public boolean equals(Object obj)
-    {
-      if (obj == null) {
-        return false;
-      }
-      if (getClass() != obj.getClass()) {
-        return false;
-      }
-      final DimensionsEvent other = (DimensionsEvent)obj;
-
-      if (this.eventKey != other.eventKey && (this.eventKey == null || !this.eventKey.equals(other.eventKey))) {
-        return false;
-      }
-      return true;
-    }
-  }
-
-  public static class Aggregate extends DimensionsEvent implements AggregateEvent
-  {
-    private static final long serialVersionUID = 201506190110L;
-
-    /**
-     * This is the aggregatorIndex assigned to this event.
-     */
-    protected int aggregatorIndex;
-    private GPOMutable metaData;
-
-    public Aggregate()
-    {
-      //for kryo and for extending classes
-    }
-
-    /**
-     * This creates a {@link DimensionsEvent} from the given event key and aggregates.
-     *
-     * @param eventKey   The key from which to create a {@link DimensionsEvent}.
-     * @param aggregates The aggregates from which to create {@link DimensionsEvent}.
-     */
-    public Aggregate(EventKey eventKey,
-        GPOMutable aggregates)
-    {
-      setEventKey(eventKey);
-      setAggregates(aggregates);
-    }
-
-    public Aggregate(EventKey eventKey,
-        GPOMutable aggregates,
-        GPOMutable metaData)
-    {
-      super(eventKey,
-          aggregates);
-
-      this.metaData = metaData;
-    }
-
-    /**
-     * Creates a DimensionsEvent with the given key values, aggregates and ids.
-     *
-     * @param keys                  The values for fields in the key.
-     * @param aggregates            The values for fields in the aggregate.
-     * @param bucketID              The bucketID
-     * @param schemaID              The schemaID.
-     * @param dimensionDescriptorID The dimensionsDescriptorID.
-     * @param aggregatorIndex       The aggregatorIndex assigned to this event by the unifier.
-     */
-    public Aggregate(GPOMutable keys,
-        GPOMutable aggregates,
-        int bucketID,
-        int schemaID,
-        int dimensionDescriptorID,
-        int aggregatorIndex)
-    {
-      this.eventKey = new EventKey(bucketID,
-          schemaID,
-          dimensionDescriptorID,
-          aggregatorIndex,
-          keys);
-      setAggregates(aggregates);
-    }
-
-    public Aggregate(GPOMutable keys,
-        GPOMutable aggregates,
-        GPOMutable metaData,
-        int bucketID,
-        int schemaID,
-        int dimensionDescriptorID,
-        int aggregatorIndex)
-    {
-      this(keys,
-          aggregates,
-          bucketID,
-          schemaID,
-          dimensionDescriptorID,
-          aggregatorIndex);
-
-      this.metaData = metaData;
-    }
-
-    /**
-     * This creates an event with the given data. Note, this constructor assumes that the bucketID will be 0.
-     *
-     * @param keys                  The value for fields in the key.
-     * @param aggregates            The value for fields in the aggregate.
-     * @param schemaID              The schemaID.
-     * @param dimensionDescriptorID The dimensionsDescriptorID.
-     * @param aggregatorIndex       The aggregatorIndex assigned to this event by the unifier.
-     */
-    public Aggregate(GPOMutable keys,
-        GPOMutable aggregates,
-        int schemaID,
-        int dimensionDescriptorID,
-        int aggregatorIndex)
-    {
-      this.eventKey = new EventKey(schemaID,
-          dimensionDescriptorID,
-          aggregatorIndex,
-          keys);
-      setAggregates(aggregates);
-    }
-
-    public Aggregate(GPOMutable keys,
-        GPOMutable aggregates,
-        GPOMutable metaData,
-        int schemaID,
-        int dimensionDescriptorID,
-        int aggregatorIndex)
-    {
-      this(keys,
-          aggregates,
-          schemaID,
-          dimensionDescriptorID,
-          aggregatorIndex);
-
-      this.metaData = metaData;
-    }
-
-    public void setMetaData(GPOMutable metaData)
-    {
-      this.metaData = metaData;
-    }
-
-    public GPOMutable getMetaData()
-    {
-      return metaData;
-    }
-
-    public void setAggregatorIndex(int aggregatorIndex)
-    {
-      this.aggregatorIndex = aggregatorIndex;
-    }
-
-    @Override
-    public int getAggregatorIndex()
-    {
-      return aggregatorIndex;
-    }
-
-    @Override
-    public int hashCode()
-    {
-      return GPOUtils.hashcode(this.getKeys());
-    }
-
-    @Override
-    public boolean equals(Object obj)
-    {
-      if (obj == null) {
-        return false;
-      }
-      if (getClass() != obj.getClass()) {
-        return false;
-      }
-      final DimensionsEvent other = (DimensionsEvent)obj;
-
-      if (this.eventKey != other.eventKey && (this.eventKey == null || !this.eventKey.equals(other.eventKey))) {
-        return false;
-      }
-      return true;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AbstractIncrementalAggregator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AbstractIncrementalAggregator.java b/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AbstractIncrementalAggregator.java
deleted file mode 100644
index f92bfbf..0000000
--- a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AbstractIncrementalAggregator.java
+++ /dev/null
@@ -1,190 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.dimensions.aggregator;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-
-import com.datatorrent.lib.appdata.gpo.GPOMutable;
-import com.datatorrent.lib.appdata.gpo.GPOUtils;
-import com.datatorrent.lib.appdata.schemas.CustomTimeBucket;
-import com.datatorrent.lib.dimensions.DimensionsConversionContext;
-import com.datatorrent.lib.dimensions.DimensionsEvent.Aggregate;
-import com.datatorrent.lib.dimensions.DimensionsEvent.EventKey;
-import com.datatorrent.lib.dimensions.DimensionsEvent.InputEvent;
-
-/**
- * * <p>
- * {@link IncrementalAggregator}s perform aggregations in place, on a field by field basis. For example if we have a
- * field cost, an incremental aggregator would take a new value of cost and aggregate it to an aggregate value for
- * cost. No fields except the cost field are used in the computation of the cost aggregation in the case of an
- * {@link IncrementalAggregator}.
- * </p>
- * <p>
- * {@link IncrementalAggregator}s are intended to be used with subclasses of
- * {@link com.datatorrent.lib.dimensions.AbstractDimensionsComputationFlexibleSingleSchema}. The way in which
- * {@link IncrementalAggregator}s are used in this context is that a batch of fields to be aggregated by the aggregator
- * are provided in the form of an {@link InputEvent}. For example, if there are two fields (cost and revenue), which
- * will be aggregated by a sum aggregator, both of those fields will be included in the {@link InputEvent} passed to
- * the sum aggregator. And the {DimensionsEventregate} event produced by the sum aggregator will contain two fields,
- * one for cost and one for revenue.
- * </p>
- * 
- */
-public abstract class AbstractIncrementalAggregator implements IncrementalAggregator
-{
-  private static final long serialVersionUID = 201506211153L;
-
-  /**
-   * The conversion context for this aggregator.
-   */
-  protected DimensionsConversionContext context;
-
-  public AbstractIncrementalAggregator()
-  {
-  }
-
-  @Override
-  public void setDimensionsConversionContext(DimensionsConversionContext context)
-  {
-    this.context = Preconditions.checkNotNull(context);
-  }
-
-  @Override
-  public Aggregate getGroup(InputEvent src, int aggregatorIndex)
-  {
-    src.used = true;
-    Aggregate aggregate = createAggregate(src,
-        context,
-        aggregatorIndex);
-    return aggregate;
-  }
-
-  @Override
-  public int hashCode(InputEvent inputEvent)
-  {
-    long timestamp = -1L;
-    boolean hasTime = this.context.inputTimestampIndex != -1
-        && this.context.outputTimebucketIndex != -1;
-
-    if (hasTime) {
-      timestamp = inputEvent.getKeys().getFieldsLong()[this.context.inputTimestampIndex];
-      inputEvent.getKeys().getFieldsLong()[this.context.inputTimestampIndex]
-          = this.context.dd.getCustomTimeBucket().roundDown(timestamp);
-    }
-
-    int hashCode = GPOUtils.indirectHashcode(inputEvent.getKeys(), context.indexSubsetKeys);
-
-    if (hasTime) {
-      inputEvent.getKeys().getFieldsLong()[this.context.inputTimestampIndex] = timestamp;
-    }
-
-    return hashCode;
-  }
-
-  @Override
-  public boolean equals(InputEvent inputEvent1, InputEvent inputEvent2)
-  {
-    long timestamp1 = 0;
-    long timestamp2 = 0;
-
-    if (context.inputTimestampIndex != -1) {
-      timestamp1 = inputEvent1.getKeys().getFieldsLong()[context.inputTimestampIndex];
-      inputEvent1.getKeys().getFieldsLong()[context.inputTimestampIndex] =
-          context.dd.getCustomTimeBucket().roundDown(timestamp1);
-
-      timestamp2 = inputEvent2.getKeys().getFieldsLong()[context.inputTimestampIndex];
-      inputEvent2.getKeys().getFieldsLong()[context.inputTimestampIndex] =
-          context.dd.getCustomTimeBucket().roundDown(timestamp2);
-    }
-
-    boolean equals = GPOUtils.subsetEquals(inputEvent2.getKeys(),
-        inputEvent1.getKeys(),
-        context.indexSubsetKeys);
-
-    if (context.inputTimestampIndex != -1) {
-      inputEvent1.getKeys().getFieldsLong()[context.inputTimestampIndex] = timestamp1;
-      inputEvent2.getKeys().getFieldsLong()[context.inputTimestampIndex] = timestamp2;
-    }
-
-    return equals;
-  }
-
-  /**
-   * Creates an {@link Aggregate} from the given {@link InputEvent}.
-   *
-   * @param inputEvent      The {@link InputEvent} to unpack into an {@link Aggregate}.
-   * @param context         The conversion context required to transform the {@link InputEvent} into
-   *                        the correct {@link Aggregate}.
-   * @param aggregatorIndex The aggregatorIndex assigned to this {@link Aggregate}.
-   * @return The converted {@link Aggregate}.
-   */
-  public static Aggregate createAggregate(InputEvent inputEvent,
-      DimensionsConversionContext context,
-      int aggregatorIndex)
-  {
-    GPOMutable aggregates = new GPOMutable(context.aggregateDescriptor);
-    EventKey eventKey = createEventKey(inputEvent,
-        context,
-        aggregatorIndex);
-
-    Aggregate aggregate = new Aggregate(eventKey,
-        aggregates);
-    aggregate.setAggregatorIndex(aggregatorIndex);
-
-    return aggregate;
-  }
-
-  /**
-   * Creates an {@link EventKey} from the given {@link InputEvent}.
-   *
-   * @param inputEvent      The {@link InputEvent} to extract an {@link EventKey} from.
-   * @param context         The conversion context required to extract the {@link EventKey} from
-   *                        the given {@link InputEvent}.
-   * @param aggregatorIndex The aggregatorIndex to assign to this {@link InputEvent}.
-   * @return The {@link EventKey} extracted from the given {@link InputEvent}.
-   */
-  public static EventKey createEventKey(InputEvent inputEvent,
-      DimensionsConversionContext context,
-      int aggregatorIndex)
-  {
-    GPOMutable keys = new GPOMutable(context.keyDescriptor);
-    GPOUtils.indirectCopy(keys, inputEvent.getKeys(), context.indexSubsetKeys);
-
-    if (context.outputTimebucketIndex >= 0) {
-      CustomTimeBucket timeBucket = context.dd.getCustomTimeBucket();
-
-      keys.getFieldsInteger()[context.outputTimebucketIndex] = context.customTimeBucketRegistry.getTimeBucketId(
-          timeBucket);
-      keys.getFieldsLong()[context.outputTimestampIndex] =
-          timeBucket.roundDown(inputEvent.getKeys().getFieldsLong()[context.inputTimestampIndex]);
-    }
-
-    EventKey eventKey = new EventKey(context.schemaID,
-        context.dimensionsDescriptorID,
-        context.aggregatorID,
-        keys);
-
-    return eventKey;
-  }
-
-  private static final Logger LOG = LoggerFactory.getLogger(AbstractIncrementalAggregator.class);
-}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregateEvent.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregateEvent.java b/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregateEvent.java
deleted file mode 100644
index e8f2f3e..0000000
--- a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregateEvent.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.dimensions.aggregator;
-
-import it.unimi.dsi.fastutil.Hash;
-
-/**
- * @since 3.3.0
- */
-public interface AggregateEvent
-{
-  int getAggregatorIndex();
-
-  public static interface Aggregator<EVENT, AGGREGATE extends AggregateEvent> extends Hash.Strategy<EVENT>
-  {
-    AGGREGATE getGroup(EVENT src, int aggregatorIndex);
-
-    void aggregate(AGGREGATE dest, EVENT src);
-
-    void aggregate(AGGREGATE dest, AGGREGATE src);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorAverage.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorAverage.java b/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorAverage.java
deleted file mode 100644
index c15bf25..0000000
--- a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorAverage.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.dimensions.aggregator;
-
-import java.util.List;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-
-import com.datatorrent.api.annotation.Name;
-import com.datatorrent.lib.appdata.gpo.GPOMutable;
-import com.datatorrent.lib.appdata.schemas.Fields;
-import com.datatorrent.lib.appdata.schemas.FieldsDescriptor;
-import com.datatorrent.lib.appdata.schemas.Type;
-
-/**
- * This is the average {@link OTFAggregator}.
- *
- * @since 3.1.0
- */
-@Name("AVG")
-public class AggregatorAverage implements OTFAggregator
-{
-  private static final long serialVersionUID = 20154301644L;
-
-  /**
-   * The array index of the sum aggregates in the argument list of the {@link #aggregate} function.
-   */
-  public static int SUM_INDEX = 0;
-  /**
-   * The array index of the count aggregates in the argument list of the {@link #aggregate} function.
-   */
-  public static int COUNT_INDEX = 1;
-  /**
-   * The singleton instance of this class.
-   */
-  public static final AggregatorAverage INSTANCE = new AggregatorAverage();
-
-  /**
-   * The list of {@link IncrementalAggregator}s that this {@link OTFAggregator} depends on.
-   */
-  public static final transient List<Class<? extends IncrementalAggregator>> CHILD_AGGREGATORS =
-      ImmutableList.of(AggregatorIncrementalType.SUM.getAggregator().getClass(),
-      AggregatorIncrementalType.COUNT.getAggregator().getClass());
-
-  /**
-   * Constructor for singleton pattern.
-   */
-  protected AggregatorAverage()
-  {
-    //Do nothing
-  }
-
-  @Override
-  public List<Class<? extends IncrementalAggregator>> getChildAggregators()
-  {
-    return CHILD_AGGREGATORS;
-  }
-
-  @Override
-  public GPOMutable aggregate(GPOMutable... aggregates)
-  {
-    Preconditions.checkArgument(aggregates.length == getChildAggregators().size(),
-        "The number of arguments " + aggregates.length +
-        " should be the same as the number of child aggregators " + getChildAggregators().size());
-
-    GPOMutable sumAggregation = aggregates[SUM_INDEX];
-    GPOMutable countAggregation = aggregates[COUNT_INDEX];
-
-    FieldsDescriptor fieldsDescriptor = sumAggregation.getFieldDescriptor();
-    Fields fields = fieldsDescriptor.getFields();
-    GPOMutable result = new GPOMutable(AggregatorUtils.getOutputFieldsDescriptor(fields, this));
-
-    long count = countAggregation.getFieldsLong()[0];
-
-    for (String field : fields.getFields()) {
-      Type type = sumAggregation.getFieldDescriptor().getType(field);
-
-      switch (type) {
-        case BYTE: {
-          double val = ((double)sumAggregation.getFieldByte(field)) /
-              ((double)count);
-          result.setField(field, val);
-          break;
-        }
-        case SHORT: {
-          double val = ((double)sumAggregation.getFieldShort(field)) /
-              ((double)count);
-          result.setField(field, val);
-          break;
-        }
-        case INTEGER: {
-          double val = ((double)sumAggregation.getFieldInt(field)) /
-              ((double)count);
-          result.setField(field, val);
-          break;
-        }
-        case LONG: {
-          double val = ((double)sumAggregation.getFieldLong(field)) /
-              ((double)count);
-          result.setField(field, val);
-          break;
-        }
-        case FLOAT: {
-          double val = sumAggregation.getFieldFloat(field) /
-              ((double)count);
-          result.setField(field, val);
-          break;
-        }
-        case DOUBLE: {
-          double val = sumAggregation.getFieldDouble(field) /
-              ((double)count);
-          result.setField(field, val);
-          break;
-        }
-        default: {
-          throw new UnsupportedOperationException("The type " + type + " is not supported.");
-        }
-      }
-    }
-
-    return result;
-  }
-
-  @Override
-  public Type getOutputType()
-  {
-    return Type.DOUBLE;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorCount.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorCount.java b/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorCount.java
deleted file mode 100644
index 8566e1c..0000000
--- a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorCount.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.dimensions.aggregator;
-
-import java.util.Collections;
-import java.util.Map;
-
-import com.google.common.collect.Maps;
-
-import com.datatorrent.api.annotation.Name;
-import com.datatorrent.lib.appdata.gpo.GPOMutable;
-import com.datatorrent.lib.appdata.gpo.GPOUtils;
-import com.datatorrent.lib.appdata.schemas.FieldsDescriptor;
-import com.datatorrent.lib.appdata.schemas.Type;
-import com.datatorrent.lib.dimensions.DimensionsEvent.Aggregate;
-import com.datatorrent.lib.dimensions.DimensionsEvent.EventKey;
-import com.datatorrent.lib.dimensions.DimensionsEvent.InputEvent;
-
-/**
- * This {@link IncrementalAggregator} performs a count of the number of times an input is encountered.
- *
- * @since 3.1.0
- */
-@Name("COUNT")
-public class AggregatorCount extends AbstractIncrementalAggregator
-{
-  private static final long serialVersionUID = 20154301645L;
-
-  /**
-   * This is a map whose keys represent input types and whose values
-   * represent the corresponding output types.
-   */
-  public static final transient Map<Type, Type> TYPE_CONVERSION_MAP;
-
-  static {
-    Map<Type, Type> typeConversionMap = Maps.newHashMap();
-
-    for (Type type : Type.values()) {
-      typeConversionMap.put(type, Type.LONG);
-    }
-
-    TYPE_CONVERSION_MAP = Collections.unmodifiableMap(typeConversionMap);
-  }
-
-  public AggregatorCount()
-  {
-    //Do nothing
-  }
-
-  @Override
-  public Aggregate getGroup(InputEvent src, int aggregatorIndex)
-  {
-    src.used = true;
-    GPOMutable aggregates = new GPOMutable(context.aggregateDescriptor);
-    GPOMutable keys = new GPOMutable(context.keyDescriptor);
-    GPOUtils.indirectCopy(keys, src.getKeys(), context.indexSubsetKeys);
-
-    EventKey eventKey = createEventKey(src,
-        context,
-        aggregatorIndex);
-
-    long[] longFields = aggregates.getFieldsLong();
-
-    for (int index = 0;
-        index < longFields.length;
-        index++) {
-      longFields[index] = 0;
-    }
-
-    return new Aggregate(eventKey,
-        aggregates);
-  }
-
-  @Override
-  public void aggregate(Aggregate dest, InputEvent src)
-  {
-    long[] fieldsLong = dest.getAggregates().getFieldsLong();
-
-    for (int index = 0;
-        index < fieldsLong.length;
-        index++) {
-      //increment count
-      fieldsLong[index]++;
-    }
-  }
-
-  @Override
-  public void aggregate(Aggregate destAgg, Aggregate srcAgg)
-  {
-    long[] destLongs = destAgg.getAggregates().getFieldsLong();
-    long[] srcLongs = srcAgg.getAggregates().getFieldsLong();
-
-    for (int index = 0;
-        index < destLongs.length;
-        index++) {
-      //aggregate count
-      destLongs[index] += srcLongs[index];
-    }
-  }
-
-  @Override
-  public Type getOutputType(Type inputType)
-  {
-    return TYPE_CONVERSION_MAP.get(inputType);
-  }
-
-  @Override
-  public FieldsDescriptor getMetaDataDescriptor()
-  {
-    return null;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorCumSum.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorCumSum.java b/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorCumSum.java
deleted file mode 100644
index f5924b8..0000000
--- a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorCumSum.java
+++ /dev/null
@@ -1,233 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.dimensions.aggregator;
-
-import java.util.List;
-import java.util.Map;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-import com.datatorrent.api.annotation.Name;
-import com.datatorrent.lib.appdata.gpo.GPOMutable;
-import com.datatorrent.lib.appdata.gpo.GPOUtils;
-import com.datatorrent.lib.appdata.gpo.Serde;
-import com.datatorrent.lib.appdata.gpo.SerdeFieldsDescriptor;
-import com.datatorrent.lib.appdata.gpo.SerdeListGPOMutable;
-import com.datatorrent.lib.appdata.gpo.SerdeObjectPayloadFix;
-import com.datatorrent.lib.appdata.schemas.FieldsDescriptor;
-import com.datatorrent.lib.appdata.schemas.Type;
-import com.datatorrent.lib.dimensions.DimensionsEvent.Aggregate;
-import com.datatorrent.lib.dimensions.DimensionsEvent.InputEvent;
-
-@Name("CUM_SUM")
-/**
- * @since 3.1.0
- */
-
-public class AggregatorCumSum extends AggregatorSum
-{
-  private static final long serialVersionUID = 201506280518L;
-
-  public static final int KEY_FD_INDEX = 0;
-  public static final int AGGREGATE_FD_INDEX = 1;
-  public static final int KEYS_INDEX = 2;
-  public static final int AGGREGATES_INDEX = 3;
-
-  public static final FieldsDescriptor META_DATA_FIELDS_DESCRIPTOR;
-
-  static {
-    Map<String, Type> fieldToType = Maps.newHashMap();
-    fieldToType.put("fdkeys", Type.OBJECT);
-    fieldToType.put("fdvalues", Type.OBJECT);
-    fieldToType.put("keys", Type.OBJECT);
-    fieldToType.put("values", Type.OBJECT);
-
-    Map<String, Serde> fieldToSerde = Maps.newHashMap();
-    fieldToSerde.put("fdkeys", SerdeFieldsDescriptor.INSTANCE);
-    fieldToSerde.put("fdvalues", SerdeFieldsDescriptor.INSTANCE);
-    fieldToSerde.put("keys", SerdeListGPOMutable.INSTANCE);
-    fieldToSerde.put("values", SerdeListGPOMutable.INSTANCE);
-
-    META_DATA_FIELDS_DESCRIPTOR = new FieldsDescriptor(fieldToType,
-        fieldToSerde,
-        new PayloadFix());
-  }
-
-  public AggregatorCumSum()
-  {
-  }
-
-  @Override
-  public Aggregate getGroup(InputEvent src, int aggregatorIndex)
-  {
-    src.used = true;
-    Aggregate agg = createAggregate(src,
-        context,
-        aggregatorIndex);
-
-    GPOUtils.indirectCopy(agg.getAggregates(), src.getAggregates(), context.indexSubsetAggregates);
-
-    GPOMutable metaData = new GPOMutable(getMetaDataDescriptor());
-
-    GPOMutable fullKey = new GPOMutable(src.getKeys());
-
-    if (context.inputTimestampIndex >= 0) {
-      fullKey.getFieldsLong()[context.inputTimestampIndex] = -1L;
-    }
-
-    List<GPOMutable> keys = Lists.newArrayList(fullKey);
-
-    GPOMutable value = new GPOMutable(agg.getAggregates());
-    List<GPOMutable> values = Lists.newArrayList(value);
-
-    metaData.getFieldsObject()[KEY_FD_INDEX] = fullKey.getFieldDescriptor();
-    metaData.getFieldsObject()[AGGREGATE_FD_INDEX] = value.getFieldDescriptor();
-    metaData.getFieldsObject()[KEYS_INDEX] = keys;
-    metaData.getFieldsObject()[AGGREGATES_INDEX] = values;
-    agg.setMetaData(metaData);
-
-    return agg;
-  }
-
-  @Override
-  public void aggregate(Aggregate dest, InputEvent src)
-  {
-    @SuppressWarnings("unchecked")
-    List<GPOMutable> destKeys =
-        (List<GPOMutable>)dest.getMetaData().getFieldsObject()[KEYS_INDEX];
-
-    @SuppressWarnings("unchecked")
-    List<GPOMutable> destAggregates =
-        (List<GPOMutable>)dest.getMetaData().getFieldsObject()[AGGREGATES_INDEX];
-
-    long timestamp = 0L;
-
-    if (context.inputTimestampIndex >= 0) {
-      timestamp = src.getKeys().getFieldsLong()[context.inputTimestampIndex];
-      src.getKeys().getFieldsLong()[context.inputTimestampIndex] = -1L;
-    }
-
-    if (!contains(destKeys, src.getKeys())) {
-      destKeys.add(new GPOMutable(src.getKeys()));
-
-      GPOMutable aggregates = new GPOMutable(context.aggregateDescriptor);
-      GPOUtils.indirectCopy(aggregates, src.getAggregates(), context.indexSubsetAggregates);
-
-      destAggregates.add(aggregates);
-
-      this.aggregateAggs(dest.getAggregates(), aggregates);
-    }
-
-    if (context.inputTimestampIndex >= 0) {
-      src.getKeys().getFieldsLong()[context.inputTimestampIndex] = timestamp;
-    }
-  }
-
-  @Override
-  public void aggregate(Aggregate dest, Aggregate src)
-  {
-    dest.getMetaData().applyObjectPayloadFix();
-    src.getMetaData().applyObjectPayloadFix();
-
-    @SuppressWarnings("unchecked")
-    List<GPOMutable> destKeys =
-        (List<GPOMutable>)dest.getMetaData().getFieldsObject()[KEYS_INDEX];
-
-    @SuppressWarnings("unchecked")
-    List<GPOMutable> srcKeys =
-        (List<GPOMutable>)src.getMetaData().getFieldsObject()[KEYS_INDEX];
-
-    @SuppressWarnings("unchecked")
-    List<GPOMutable> destAggregates =
-        (List<GPOMutable>)dest.getMetaData().getFieldsObject()[AGGREGATES_INDEX];
-
-    @SuppressWarnings("unchecked")
-    List<GPOMutable> srcAggregates =
-        (List<GPOMutable>)src.getMetaData().getFieldsObject()[AGGREGATES_INDEX];
-
-    List<GPOMutable> newKeys = Lists.newArrayList();
-    List<GPOMutable> newAggs = Lists.newArrayList();
-
-    for (int index = 0;
-        index < srcKeys.size();
-        index++) {
-      GPOMutable currentSrcKey = srcKeys.get(index);
-      GPOMutable currentSrcAgg = srcAggregates.get(index);
-
-      if (!contains(destKeys, currentSrcKey)) {
-        newKeys.add(currentSrcKey);
-        newAggs.add(currentSrcAgg);
-
-        this.aggregateAggs(dest.getAggregates(), currentSrcAgg);
-      }
-    }
-
-    destKeys.addAll(newKeys);
-    destAggregates.addAll(newAggs);
-  }
-
-  private boolean contains(List<GPOMutable> mutables, GPOMutable mutable)
-  {
-    for (int index = 0;
-        index < mutables.size();
-        index++) {
-      GPOMutable mutableFromList = mutables.get(index);
-
-      if (GPOUtils.equals(mutableFromList, mutable)) {
-        return true;
-      }
-    }
-
-    return false;
-  }
-
-  @Override
-  public FieldsDescriptor getMetaDataDescriptor()
-  {
-    return META_DATA_FIELDS_DESCRIPTOR;
-  }
-
-  public static class PayloadFix implements SerdeObjectPayloadFix
-  {
-    @Override
-    public void fix(Object[] objects)
-    {
-      FieldsDescriptor keyfd = (FieldsDescriptor)objects[KEY_FD_INDEX];
-      FieldsDescriptor valuefd = (FieldsDescriptor)objects[AGGREGATE_FD_INDEX];
-
-      @SuppressWarnings("unchecked")
-      List<GPOMutable> keyMutables = (List<GPOMutable>)objects[KEYS_INDEX];
-      @SuppressWarnings("unchecked")
-      List<GPOMutable> aggregateMutables = (List<GPOMutable>)objects[AGGREGATES_INDEX];
-
-      fix(keyfd, keyMutables);
-      fix(valuefd, aggregateMutables);
-    }
-
-    private void fix(FieldsDescriptor fd, List<GPOMutable> mutables)
-    {
-      for (int index = 0;
-          index < mutables.size();
-          index++) {
-        mutables.get(index).setFieldDescriptor(fd);
-      }
-    }
-  }
-}