You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ti...@apache.org on 2016/03/29 00:39:44 UTC

[3/4] incubator-apex-malhar git commit: APEXMALHAR-1991 #resolve #comment Move Dimensions Computation Classes to org.apache.apex.malhar package

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorFirst.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorFirst.java b/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorFirst.java
deleted file mode 100644
index e1bf7d4..0000000
--- a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorFirst.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.dimensions.aggregator;
-
-import com.datatorrent.api.annotation.Name;
-import com.datatorrent.lib.appdata.gpo.GPOUtils;
-import com.datatorrent.lib.appdata.schemas.FieldsDescriptor;
-import com.datatorrent.lib.appdata.schemas.Type;
-import com.datatorrent.lib.dimensions.DimensionsEvent.Aggregate;
-import com.datatorrent.lib.dimensions.DimensionsEvent.InputEvent;
-
-/**
- * <p>
- * This aggregator creates an aggregate out of the first {@link InputEvent} encountered by this aggregator. All
- * subsequent
- * {@link InputEvent}s are ignored.
- * </p>
- * <p>
- * <b>Note:</b> when aggregates are combined in a unifier it is not possible to tell which came first or last, so
- * one is picked arbitrarily to be the first.
- * </p>
- *
- * @since 3.1.0
- */
-@Name("FIRST")
-public class AggregatorFirst extends AbstractIncrementalAggregator
-{
-  private static final long serialVersionUID = 20154301646L;
-
-  public AggregatorFirst()
-  {
-    //Do nothing
-  }
-
-  @Override
-  public Aggregate getGroup(InputEvent src, int aggregatorIndex)
-  {
-    Aggregate aggregate = super.getGroup(src, aggregatorIndex);
-
-    GPOUtils.indirectCopy(aggregate.getAggregates(), src.getAggregates(), context.indexSubsetAggregates);
-
-    return aggregate;
-  }
-
-  @Override
-  public Type getOutputType(Type inputType)
-  {
-    return AggregatorUtils.IDENTITY_TYPE_MAP.get(inputType);
-  }
-
-  @Override
-  public void aggregate(Aggregate dest, InputEvent src)
-  {
-    //Ignore
-  }
-
-  @Override
-  public void aggregate(Aggregate dest, Aggregate src)
-  {
-    //Ignore
-  }
-
-  @Override
-  public FieldsDescriptor getMetaDataDescriptor()
-  {
-    return null;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorIncrementalType.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorIncrementalType.java b/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorIncrementalType.java
deleted file mode 100644
index 09190e1..0000000
--- a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorIncrementalType.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.dimensions.aggregator;
-
-import java.util.Collections;
-import java.util.Map;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
-
-/**
- * @since 3.1.0
- */
-
-public enum AggregatorIncrementalType
-{
-  SUM(new AggregatorSum()),
-  MIN(new AggregatorMin()),
-  MAX(new AggregatorMax()),
-  COUNT(new AggregatorCount()),
-  LAST(new AggregatorLast()),
-  FIRST(new AggregatorFirst()),
-  CUM_SUM(new AggregatorCumSum());
-
-  public static final Map<String, Integer> NAME_TO_ORDINAL;
-  public static final Map<String, IncrementalAggregator> NAME_TO_AGGREGATOR;
-
-  private IncrementalAggregator aggregator;
-
-  static {
-    Map<String, Integer> nameToOrdinal = Maps.newHashMap();
-    Map<String, IncrementalAggregator> nameToAggregator = Maps.newHashMap();
-
-    for (AggregatorIncrementalType aggType : AggregatorIncrementalType.values()) {
-      nameToOrdinal.put(aggType.name(), aggType.ordinal());
-      nameToAggregator.put(aggType.name(), aggType.getAggregator());
-    }
-
-    NAME_TO_ORDINAL = Collections.unmodifiableMap(nameToOrdinal);
-    NAME_TO_AGGREGATOR = Collections.unmodifiableMap(nameToAggregator);
-  }
-
-  AggregatorIncrementalType(IncrementalAggregator aggregator)
-  {
-    setAggregator(aggregator);
-  }
-
-  private void setAggregator(IncrementalAggregator aggregator)
-  {
-    Preconditions.checkNotNull(aggregator);
-    this.aggregator = aggregator;
-  }
-
-  public IncrementalAggregator getAggregator()
-  {
-    return aggregator;
-  }
-
-  private static final Logger LOG = LoggerFactory.getLogger(AggregatorIncrementalType.class);
-}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorLast.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorLast.java b/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorLast.java
deleted file mode 100644
index f727036..0000000
--- a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorLast.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.dimensions.aggregator;
-
-import com.datatorrent.api.annotation.Name;
-import com.datatorrent.lib.appdata.gpo.GPOUtils;
-import com.datatorrent.lib.appdata.schemas.FieldsDescriptor;
-import com.datatorrent.lib.appdata.schemas.Type;
-import com.datatorrent.lib.dimensions.DimensionsEvent;
-import com.datatorrent.lib.dimensions.DimensionsEvent.Aggregate;
-import com.datatorrent.lib.dimensions.DimensionsEvent.InputEvent;
-
-/**
- * <p>
- * This aggregator creates an aggregate out of the last {@link InputEvent} encountered by this aggregator. All previous
- * {@link InputEvent}s are ignored.
- * </p>
- * <p>
- * <b>Note:</b> when aggregates are combined in a unifier it is not possible to tell which came first or last, so
- * one is picked arbitrarily to be the last.
- * </p>
- *
- * @since 3.1.0
- */
-@Name("LAST")
-public class AggregatorLast extends AbstractIncrementalAggregator
-{
-  private static final long serialVersionUID = 20154301647L;
-
-  public AggregatorLast()
-  {
-    //Do nothing
-  }
-
-  @Override
-  public Aggregate getGroup(InputEvent src, int aggregatorIndex)
-  {
-    Aggregate aggregate = super.getGroup(src, aggregatorIndex);
-
-    GPOUtils.indirectCopy(aggregate.getAggregates(), src.getAggregates(), context.indexSubsetAggregates);
-
-    return aggregate;
-  }
-
-  @Override
-  public Type getOutputType(Type inputType)
-  {
-    return AggregatorUtils.IDENTITY_TYPE_MAP.get(inputType);
-  }
-
-  @Override
-  public void aggregate(Aggregate dest, InputEvent src)
-  {
-    GPOUtils.indirectCopy(dest.getAggregates(), src.getAggregates(), context.indexSubsetAggregates);
-  }
-
-  @Override
-  public void aggregate(Aggregate dest, Aggregate src)
-  {
-    DimensionsEvent.copy(dest, src);
-  }
-
-  @Override
-  public FieldsDescriptor getMetaDataDescriptor()
-  {
-    return null;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorMax.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorMax.java b/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorMax.java
deleted file mode 100644
index 25f9db2..0000000
--- a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorMax.java
+++ /dev/null
@@ -1,265 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.dimensions.aggregator;
-
-import com.datatorrent.api.annotation.Name;
-import com.datatorrent.lib.appdata.gpo.GPOMutable;
-import com.datatorrent.lib.appdata.gpo.GPOUtils;
-import com.datatorrent.lib.appdata.schemas.FieldsDescriptor;
-import com.datatorrent.lib.appdata.schemas.Type;
-import com.datatorrent.lib.dimensions.DimensionsEvent.Aggregate;
-import com.datatorrent.lib.dimensions.DimensionsEvent.InputEvent;
-
-/**
- * This {@link IncrementalAggregator} takes the max of the fields provided in the {@link InputEvent}.
- *
- * @since 3.1.0
- */
-@Name("MAX")
-public class AggregatorMax extends AbstractIncrementalAggregator
-{
-  private static final long serialVersionUID = 201503120332L;
-
-  public AggregatorMax()
-  {
-    //Do nothing
-  }
-
-  @Override
-  public Aggregate getGroup(InputEvent src, int aggregatorIndex)
-  {
-    Aggregate aggregate = super.getGroup(src, aggregatorIndex);
-
-    GPOUtils.indirectCopy(aggregate.getAggregates(), src.getAggregates(), context.indexSubsetAggregates);
-
-    return aggregate;
-  }
-
-  @Override
-  public void aggregate(Aggregate dest, InputEvent src)
-  {
-    GPOMutable destAggs = dest.getAggregates();
-    GPOMutable srcAggs = src.getAggregates();
-
-    {
-      byte[] destByte = destAggs.getFieldsByte();
-      if (destByte != null) {
-        byte[] srcByte = srcAggs.getFieldsByte();
-        int[] srcIndices = context.indexSubsetAggregates.fieldsByteIndexSubset;
-        for (int index = 0;
-            index < destByte.length;
-            index++) {
-          byte tempVal = srcByte[srcIndices[index]];
-          if (destByte[index] < tempVal) {
-            destByte[index] = tempVal;
-          }
-        }
-      }
-    }
-
-    {
-      short[] destShort = destAggs.getFieldsShort();
-      if (destShort != null) {
-        short[] srcShort = srcAggs.getFieldsShort();
-        int[] srcIndices = context.indexSubsetAggregates.fieldsShortIndexSubset;
-        for (int index = 0;
-            index < destShort.length;
-            index++) {
-          short tempVal = srcShort[srcIndices[index]];
-          if (destShort[index] < tempVal) {
-            destShort[index] = tempVal;
-          }
-        }
-      }
-    }
-
-    {
-      int[] destInteger = destAggs.getFieldsInteger();
-      if (destInteger != null) {
-        int[] srcInteger = srcAggs.getFieldsInteger();
-        int[] srcIndices = context.indexSubsetAggregates.fieldsIntegerIndexSubset;
-        for (int index = 0;
-            index < destInteger.length;
-            index++) {
-          int tempVal = srcInteger[srcIndices[index]];
-          if (destInteger[index] < tempVal) {
-            destInteger[index] = tempVal;
-          }
-        }
-      }
-    }
-
-    {
-      long[] destLong = destAggs.getFieldsLong();
-      if (destLong != null) {
-        long[] srcLong = srcAggs.getFieldsLong();
-        int[] srcIndices = context.indexSubsetAggregates.fieldsLongIndexSubset;
-        for (int index = 0;
-            index < destLong.length;
-            index++) {
-          long tempVal = srcLong[srcIndices[index]];
-          if (destLong[index] < tempVal) {
-            destLong[index] = tempVal;
-          }
-        }
-      }
-    }
-
-    {
-      float[] destFloat = destAggs.getFieldsFloat();
-      if (destFloat != null) {
-        float[] srcFloat = srcAggs.getFieldsFloat();
-        int[] srcIndices = context.indexSubsetAggregates.fieldsFloatIndexSubset;
-        for (int index = 0;
-            index < destFloat.length;
-            index++) {
-          float tempVal = srcFloat[srcIndices[index]];
-          if (destFloat[index] < tempVal) {
-            destFloat[index] = tempVal;
-          }
-        }
-      }
-    }
-
-    {
-      double[] destDouble = destAggs.getFieldsDouble();
-      if (destDouble != null) {
-        double[] srcDouble = srcAggs.getFieldsDouble();
-        int[] srcIndices = context.indexSubsetAggregates.fieldsDoubleIndexSubset;
-        for (int index = 0;
-            index < destDouble.length;
-            index++) {
-          double tempVal = srcDouble[srcIndices[index]];
-          if (destDouble[index] < tempVal) {
-            destDouble[index] = tempVal;
-          }
-        }
-      }
-    }
-  }
-
-  @Override
-  public void aggregate(Aggregate dest, Aggregate src)
-  {
-    GPOMutable destAggs = dest.getAggregates();
-    GPOMutable srcAggs = src.getAggregates();
-
-    {
-      byte[] destByte = destAggs.getFieldsByte();
-      if (destByte != null) {
-        byte[] srcByte = srcAggs.getFieldsByte();
-
-        for (int index = 0;
-            index < destByte.length;
-            index++) {
-          if (destByte[index] < srcByte[index]) {
-            destByte[index] = srcByte[index];
-          }
-        }
-      }
-    }
-
-    {
-      short[] destShort = destAggs.getFieldsShort();
-      if (destShort != null) {
-        short[] srcShort = srcAggs.getFieldsShort();
-
-        for (int index = 0;
-            index < destShort.length;
-            index++) {
-          if (destShort[index] < srcShort[index]) {
-            destShort[index] = srcShort[index];
-          }
-        }
-      }
-    }
-
-    {
-      int[] destInteger = destAggs.getFieldsInteger();
-      if (destInteger != null) {
-        int[] srcInteger = srcAggs.getFieldsInteger();
-
-        for (int index = 0;
-            index < destInteger.length;
-            index++) {
-          if (destInteger[index] < srcInteger[index]) {
-            destInteger[index] = srcInteger[index];
-          }
-        }
-      }
-    }
-
-    {
-      long[] destLong = destAggs.getFieldsLong();
-      if (destLong != null) {
-        long[] srcLong = srcAggs.getFieldsLong();
-
-        for (int index = 0;
-            index < destLong.length;
-            index++) {
-          if (destLong[index] < srcLong[index]) {
-            destLong[index] = srcLong[index];
-          }
-        }
-      }
-    }
-
-    {
-      float[] destFloat = destAggs.getFieldsFloat();
-      if (destFloat != null) {
-        float[] srcFloat = srcAggs.getFieldsFloat();
-
-        for (int index = 0;
-            index < destFloat.length;
-            index++) {
-          if (destFloat[index] < srcFloat[index]) {
-            destFloat[index] = srcFloat[index];
-          }
-        }
-      }
-    }
-
-    {
-      double[] destDouble = destAggs.getFieldsDouble();
-      if (destDouble != null) {
-        double[] srcDouble = srcAggs.getFieldsDouble();
-
-        for (int index = 0;
-            index < destDouble.length;
-            index++) {
-          if (destDouble[index] < srcDouble[index]) {
-            destDouble[index] = srcDouble[index];
-          }
-        }
-      }
-    }
-  }
-
-  @Override
-  public Type getOutputType(Type inputType)
-  {
-    return AggregatorUtils.IDENTITY_NUMBER_TYPE_MAP.get(inputType);
-  }
-
-  @Override
-  public FieldsDescriptor getMetaDataDescriptor()
-  {
-    return null;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorMin.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorMin.java b/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorMin.java
deleted file mode 100644
index b377e9b..0000000
--- a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorMin.java
+++ /dev/null
@@ -1,265 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.dimensions.aggregator;
-
-import com.datatorrent.api.annotation.Name;
-import com.datatorrent.lib.appdata.gpo.GPOMutable;
-import com.datatorrent.lib.appdata.gpo.GPOUtils;
-import com.datatorrent.lib.appdata.schemas.FieldsDescriptor;
-import com.datatorrent.lib.appdata.schemas.Type;
-import com.datatorrent.lib.dimensions.DimensionsEvent.Aggregate;
-import com.datatorrent.lib.dimensions.DimensionsEvent.InputEvent;
-
-/**
- * This {@link IncrementalAggregator} takes the min of the fields provided in the {@link InputEvent}.
- *
- * @since 3.1.0
- */
-@Name("MIN")
-public class AggregatorMin extends AbstractIncrementalAggregator
-{
-  private static final long serialVersionUID = 20154301648L;
-
-  public AggregatorMin()
-  {
-    //Do nothing
-  }
-
-  @Override
-  public Aggregate getGroup(InputEvent src, int aggregatorIndex)
-  {
-    Aggregate aggregate = super.getGroup(src, aggregatorIndex);
-
-    GPOUtils.indirectCopy(aggregate.getAggregates(), src.getAggregates(), context.indexSubsetAggregates);
-
-    return aggregate;
-  }
-
-  @Override
-  public void aggregate(Aggregate dest, InputEvent src)
-  {
-    GPOMutable destAggs = dest.getAggregates();
-    GPOMutable srcAggs = src.getAggregates();
-
-    {
-      byte[] destByte = destAggs.getFieldsByte();
-      if (destByte != null) {
-        byte[] srcByte = srcAggs.getFieldsByte();
-        int[] srcIndices = context.indexSubsetAggregates.fieldsByteIndexSubset;
-        for (int index = 0;
-            index < destByte.length;
-            index++) {
-          byte tempVal = srcByte[srcIndices[index]];
-          if (destByte[index] > tempVal) {
-            destByte[index] = tempVal;
-          }
-        }
-      }
-    }
-
-    {
-      short[] destShort = destAggs.getFieldsShort();
-      if (destShort != null) {
-        short[] srcShort = srcAggs.getFieldsShort();
-        int[] srcIndices = context.indexSubsetAggregates.fieldsShortIndexSubset;
-        for (int index = 0;
-            index < destShort.length;
-            index++) {
-          short tempVal = srcShort[srcIndices[index]];
-          if (destShort[index] > tempVal) {
-            destShort[index] = tempVal;
-          }
-        }
-      }
-    }
-
-    {
-      int[] destInteger = destAggs.getFieldsInteger();
-      if (destInteger != null) {
-        int[] srcInteger = srcAggs.getFieldsInteger();
-        int[] srcIndices = context.indexSubsetAggregates.fieldsIntegerIndexSubset;
-        for (int index = 0;
-            index < destInteger.length;
-            index++) {
-          int tempVal = srcInteger[srcIndices[index]];
-          if (destInteger[index] > tempVal) {
-            destInteger[index] = tempVal;
-          }
-        }
-      }
-    }
-
-    {
-      long[] destLong = destAggs.getFieldsLong();
-      if (destLong != null) {
-        long[] srcLong = srcAggs.getFieldsLong();
-        int[] srcIndices = context.indexSubsetAggregates.fieldsLongIndexSubset;
-        for (int index = 0;
-            index < destLong.length;
-            index++) {
-          long tempVal = srcLong[srcIndices[index]];
-          if (destLong[index] > tempVal) {
-            destLong[index] = tempVal;
-          }
-        }
-      }
-    }
-
-    {
-      float[] destFloat = destAggs.getFieldsFloat();
-      if (destFloat != null) {
-        float[] srcFloat = srcAggs.getFieldsFloat();
-        int[] srcIndices = context.indexSubsetAggregates.fieldsFloatIndexSubset;
-        for (int index = 0;
-            index < destFloat.length;
-            index++) {
-          float tempVal = srcFloat[srcIndices[index]];
-          if (destFloat[index] > tempVal) {
-            destFloat[index] = tempVal;
-          }
-        }
-      }
-    }
-
-    {
-      double[] destDouble = destAggs.getFieldsDouble();
-      if (destDouble != null) {
-        double[] srcDouble = srcAggs.getFieldsDouble();
-        int[] srcIndices = context.indexSubsetAggregates.fieldsDoubleIndexSubset;
-        for (int index = 0;
-            index < destDouble.length;
-            index++) {
-          double tempVal = srcDouble[srcIndices[index]];
-          if (destDouble[index] > tempVal) {
-            destDouble[index] = tempVal;
-          }
-        }
-      }
-    }
-  }
-
-  @Override
-  public void aggregate(Aggregate dest, Aggregate src)
-  {
-    GPOMutable destAggs = dest.getAggregates();
-    GPOMutable srcAggs = src.getAggregates();
-
-    {
-      byte[] destByte = destAggs.getFieldsByte();
-      if (destByte != null) {
-        byte[] srcByte = srcAggs.getFieldsByte();
-
-        for (int index = 0;
-            index < destByte.length;
-            index++) {
-          if (destByte[index] > srcByte[index]) {
-            destByte[index] = srcByte[index];
-          }
-        }
-      }
-    }
-
-    {
-      short[] destShort = destAggs.getFieldsShort();
-      if (destShort != null) {
-        short[] srcShort = srcAggs.getFieldsShort();
-
-        for (int index = 0;
-            index < destShort.length;
-            index++) {
-          if (destShort[index] > srcShort[index]) {
-            destShort[index] = srcShort[index];
-          }
-        }
-      }
-    }
-
-    {
-      int[] destInteger = destAggs.getFieldsInteger();
-      if (destInteger != null) {
-        int[] srcInteger = srcAggs.getFieldsInteger();
-
-        for (int index = 0;
-            index < destInteger.length;
-            index++) {
-          if (destInteger[index] > srcInteger[index]) {
-            destInteger[index] = srcInteger[index];
-          }
-        }
-      }
-    }
-
-    {
-      long[] destLong = destAggs.getFieldsLong();
-      if (destLong != null) {
-        long[] srcLong = srcAggs.getFieldsLong();
-
-        for (int index = 0;
-            index < destLong.length;
-            index++) {
-          if (destLong[index] > srcLong[index]) {
-            destLong[index] = srcLong[index];
-          }
-        }
-      }
-    }
-
-    {
-      float[] destFloat = destAggs.getFieldsFloat();
-      if (destFloat != null) {
-        float[] srcFloat = srcAggs.getFieldsFloat();
-
-        for (int index = 0;
-            index < destFloat.length;
-            index++) {
-          if (destFloat[index] > srcFloat[index]) {
-            destFloat[index] = srcFloat[index];
-          }
-        }
-      }
-    }
-
-    {
-      double[] destDouble = destAggs.getFieldsDouble();
-      if (destDouble != null) {
-        double[] srcDouble = srcAggs.getFieldsDouble();
-
-        for (int index = 0;
-            index < destDouble.length;
-            index++) {
-          if (destDouble[index] > srcDouble[index]) {
-            destDouble[index] = srcDouble[index];
-          }
-        }
-      }
-    }
-  }
-
-  @Override
-  public Type getOutputType(Type inputType)
-  {
-    return AggregatorUtils.IDENTITY_NUMBER_TYPE_MAP.get(inputType);
-  }
-
-  @Override
-  public FieldsDescriptor getMetaDataDescriptor()
-  {
-    return null;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorOTFType.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorOTFType.java b/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorOTFType.java
deleted file mode 100644
index fd711cb..0000000
--- a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorOTFType.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.dimensions.aggregator;
-
-import java.util.Collections;
-import java.util.Map;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
-
-/**
- * This is a convenience enum to store all the information about default {@link OTFAggregator}s
- * in one place.
- *
- * @since 3.1.0
- */
-public enum AggregatorOTFType
-{
-  /**
-   * The average {@link OTFAggregator}.
-   */
-  AVG(AggregatorAverage.INSTANCE);
-
-  /**
-   * A map from {@link OTFAggregator} names to {@link OTFAggregator}s.
-   */
-  public static final Map<String, OTFAggregator> NAME_TO_AGGREGATOR;
-
-  static {
-    Map<String, OTFAggregator> nameToAggregator = Maps.newHashMap();
-
-    for (AggregatorOTFType aggType : AggregatorOTFType.values()) {
-      nameToAggregator.put(aggType.name(), aggType.getAggregator());
-    }
-
-    NAME_TO_AGGREGATOR = Collections.unmodifiableMap(nameToAggregator);
-  }
-
-  /**
-   * The {@link OTFAggregator} assigned to this enum.
-   */
-  private OTFAggregator aggregator;
-
-  /**
-   * Creates an {@link OTFAggregator} enum with the given aggregator.
-   *
-   * @param aggregator The {@link OTFAggregator} assigned to this enum.
-   */
-  AggregatorOTFType(OTFAggregator aggregator)
-  {
-    setAggregator(aggregator);
-  }
-
-  /**
-   * Sets the {@link OTFAggregator} assigned to this enum.
-   *
-   * @param aggregator The {@link OTFAggregator} assigned to this enum.
-   */
-  private void setAggregator(OTFAggregator aggregator)
-  {
-    this.aggregator = Preconditions.checkNotNull(aggregator);
-  }
-
-  /**
-   * Gets the {@link OTFAggregator} assigned to this enum.
-   *
-   * @return The {@link OTFAggregator} assigned to this enum.
-   */
-  public OTFAggregator getAggregator()
-  {
-    return aggregator;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorRegistry.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorRegistry.java b/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorRegistry.java
deleted file mode 100644
index ff5a75d..0000000
--- a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorRegistry.java
+++ /dev/null
@@ -1,424 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.dimensions.aggregator;
-
-import java.io.Serializable;
-import java.util.List;
-import java.util.Map;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-/**
- * <p>
- * This registry is used by generic dimensions computation operators and dimension stores in order to support
- * plugging different
- * aggregators into the operator. Subclasses of
- * {@link com.datatorrent.lib.dimensions.AbstractDimensionsComputationFlexibleSingleSchema} use this registry
- * to support pluggable aggregators when doing dimensions computation, and Subclasses of
- * AppDataSingleSchemaDimensionStoreHDHT use this class as well.
- * </p>
- * <p>
- * The primary purpose of an {@link AggregatorRegistry} is to provide a mapping from aggregator names to aggregators,
- * and to provide mappings from aggregator IDs to aggregators. These mappings are necessary in order to correctly
- * process schemas, App Data queries, and store aggregated data.
- * </p>
- *
- * @since 3.1.0
- */
-public class AggregatorRegistry implements Serializable
-{
-  private static final long serialVersionUID = 20154301642L;
-
-  /**
-   * This is a map from {@link IncrementalAggregator} names to {@link IncrementalAggregator}s used by the
-   * default {@link AggregatorRegistry}.
-   */
-  private static final transient Map<String, IncrementalAggregator> DEFAULT_NAME_TO_INCREMENTAL_AGGREGATOR;
-  /**
-   * This is a map from {@link OTFAggregator} names to {@link OTFAggregator}s used by the default
-   * {@link AggregatorRegistry}.
-   */
-  private static final transient Map<String, OTFAggregator> DEFAULT_NAME_TO_OTF_AGGREGATOR;
-
-  //Build the default maps
-  static {
-    DEFAULT_NAME_TO_INCREMENTAL_AGGREGATOR = Maps.newHashMap(AggregatorIncrementalType.NAME_TO_AGGREGATOR);
-    DEFAULT_NAME_TO_OTF_AGGREGATOR = Maps.newHashMap(AggregatorOTFType.NAME_TO_AGGREGATOR);
-  }
-
-  /**
-   * This is a default aggregator registry that can be used in operators.
-   */
-  public static final AggregatorRegistry DEFAULT_AGGREGATOR_REGISTRY = new AggregatorRegistry(
-      DEFAULT_NAME_TO_INCREMENTAL_AGGREGATOR, DEFAULT_NAME_TO_OTF_AGGREGATOR,
-      AggregatorIncrementalType.NAME_TO_ORDINAL);
-
-  /**
-   * This is a flag indicating whether or not this {@link AggregatorRegistry} has been setup before or not.
-   */
-  private transient boolean setup = false;
-  /**
-   * This is a map from the class of an {@link IncrementalAggregator} to the name of that
-   * {@link IncrementalAggregator}.
-   */
-  private transient Map<Class<? extends IncrementalAggregator>, String> classToIncrementalAggregatorName;
-  /**
-   * This is a map from the name of an {@link OTFAggregator} to the list of the names of all
-   * {@link IncrementalAggregator} that are child aggregators of that {@link OTFAggregator}.
-   */
-  private transient Map<String, List<String>> otfAggregatorToIncrementalAggregators;
-  /**
-   * This is a map from the aggregator ID of an
-   * {@link IncrementalAggregator} to the corresponding {@link IncrementalAggregator}.
-   */
-  private transient Map<Integer, IncrementalAggregator> incrementalAggregatorIDToAggregator;
-  /**
-   * This is a map from the name assigned to an {@link IncrementalAggregator} to the {@link IncrementalAggregator}.
-   */
-  private Map<String, IncrementalAggregator> nameToIncrementalAggregator;
-  /**
-   * This is a map from the name assigned to an {@link OTFAggregator} to the {@link OTFAggregator}.
-   */
-  private Map<String, OTFAggregator> nameToOTFAggregator;
-  /**
-   * This is a map from the name of an {@link IncrementalAggregator} to the ID of that {@link IncrementalAggregator}.
-   */
-  private Map<String, Integer> incrementalAggregatorNameToID;
-
-  /**
-   * This is a helper method used to autogenerate the IDs for each {@link IncrementalAggregator}
-   *
-   * @param nameToAggregator A mapping from the name of an {@link IncrementalAggregator} to the
-   *                         {@link IncrementalAggregator}.
-   * @return A mapping from the name of an {@link IncrementalAggregator} to the ID assigned to that
-   * {@link IncrementalAggregator}.
-   */
-  private static Map<String, Integer> autoGenIds(Map<String, IncrementalAggregator> nameToAggregator)
-  {
-    Map<String, Integer> staticAggregatorNameToID = Maps.newHashMap();
-
-    for (Map.Entry<String, IncrementalAggregator> entry : nameToAggregator.entrySet()) {
-      staticAggregatorNameToID.put(entry.getKey(), stringHash(entry.getValue().getClass().getName()));
-    }
-
-    return staticAggregatorNameToID;
-  }
-
-  /**
-   * This is a helper method for computing the hash of the string. This is intended to be a static unchanging
-   * method since the computed hash is used for aggregator IDs which are used for persistence.
-   * <p>
-   * <b>Note:</b> Do not change this function it will cause corruption for users updating existing data stores.
-   * </p>
-   *
-   * @return The hash of the given string.
-   */
-  private static int stringHash(String string)
-  {
-    int hash = 5381;
-
-    for (int index = 0;
-        index < string.length();
-        index++) {
-      int character = (int)string.charAt(index);
-      hash = hash * 33 + character;
-    }
-
-    return hash;
-  }
-
-  /**
-   * This constructor is present for Kryo serialization
-   */
-  private AggregatorRegistry()
-  {
-    //for kryo
-  }
-
-  /**
-   * <p>
-   * This creates an {@link AggregatorRegistry} which assigns the given names to the given
-   * {@link IncrementalAggregator}s and {@link OTFAggregator}s. This constructor also auto-generates
-   * the IDs associated with each {@link IncrementalAggregator} by computing the hashcode of the
-   * fully qualified class name of each {@link IncrementalAggregator}.
-   * </p>
-   * <p>
-   * <b>Note:</b> IDs only need to be generated for {@link IncrementalAggregator}s since they are the
-   * only type of stored aggregations. {@link OTFAggregator}s do not require an ID since they are not stored.
-   * </p>
-   *
-   * @param nameToIncrementalAggregator This is a map from {@link String} to {@link IncrementalAggregator},
-   *                                    where the string is the name of an
-   *                                    {@link IncrementalAggregator} and the value is the {@link IncrementalAggregator}
-   *                                    with that name.
-   * @param nameToOTFAggregator         This is a map from {@link String} to {@link OTFAggregator}, where the string
-   *                                    is the name of
-   *                                    an {@link OTFAggregator} and the value is the {@link OTFAggregator} with that
-   *                                    name.
-   */
-  public AggregatorRegistry(Map<String, IncrementalAggregator> nameToIncrementalAggregator,
-      Map<String, OTFAggregator> nameToOTFAggregator)
-  {
-    this(nameToIncrementalAggregator,
-        nameToOTFAggregator,
-        autoGenIds(nameToIncrementalAggregator));
-  }
-
-  /**
-   * <p>
-   * This creates an {@link AggregatorRegistry} which assigns the given names to the given
-   * {@link IncrementalAggregator}s and {@link OTFAggregator}s. This constructor assigns IDs to each
-   * {@link IncrementalAggregator} by using the provided map from incremental aggregator names to IDs.
-   * </p>
-   * <p>
-   * <b>Note:</b> IDs only need to be generated for {@link IncrementalAggregator}s since they are the
-   * only type of stored aggregations. {@link OTFAggregator}s do not require an ID since they are not stored.
-   * </p>
-   *
-   * @param nameToIncrementalAggregator   This is a map from {@link String} to {@link IncrementalAggregator},
-   *                                      where the string is the name of an
-   *                                      {@link IncrementalAggregator} and the value is the
-   *                                      {@link IncrementalAggregator}
-   *                                      with that name.
-   * @param nameToOTFAggregator           This is a map from {@link String} to {@link OTFAggregator}, where the
-   *                                      string is the name of
-   *                                      an {@link OTFAggregator} and the value is the {@link OTFAggregator} with
-   *                                      that name.
-   * @param incrementalAggregatorNameToID This is a map from the name of an {@link IncrementalAggregator} to the ID
-   *                                      for that
-   *                                      {@link IncrementalAggregator}.
-   */
-  public AggregatorRegistry(Map<String, IncrementalAggregator> nameToIncrementalAggregator,
-      Map<String, OTFAggregator> nameToOTFAggregator,
-      Map<String, Integer> incrementalAggregatorNameToID)
-  {
-    setNameToIncrementalAggregator(nameToIncrementalAggregator);
-    setNameToOTFAggregator(nameToOTFAggregator);
-
-    setIncrementalAggregatorNameToID(incrementalAggregatorNameToID);
-
-    validate();
-  }
-
-  /**
-   * This is a helper method which is used to do validation on the maps provided to the constructor of this class.
-   */
-  private void validate()
-  {
-    for (Map.Entry<String, IncrementalAggregator> entry : nameToIncrementalAggregator.entrySet()) {
-      Preconditions.checkNotNull(entry.getKey());
-      Preconditions.checkNotNull(entry.getValue());
-    }
-
-    for (Map.Entry<String, OTFAggregator> entry : nameToOTFAggregator.entrySet()) {
-      Preconditions.checkNotNull(entry.getKey());
-      Preconditions.checkNotNull(entry.getValue());
-    }
-
-    for (Map.Entry<String, Integer> entry : incrementalAggregatorNameToID.entrySet()) {
-      Preconditions.checkNotNull(entry.getKey());
-      Preconditions.checkNotNull(entry.getValue());
-    }
-  }
-
-  /**
-   * This method is called to initialize various internal datastructures of the {@link AggregatorRegistry}.
-   * This method should be called before the {@link AggregatorRegistry} is used.
-   */
-  @SuppressWarnings({"unchecked", "rawtypes"})
-  public void setup()
-  {
-    if (setup) {
-      //If the AggregatorRegistry was already setup. Don't set it up again.
-      return;
-    }
-
-    setup = true;
-
-    classToIncrementalAggregatorName = Maps.newHashMap();
-
-    for (Map.Entry<String, IncrementalAggregator> entry : nameToIncrementalAggregator.entrySet()) {
-      classToIncrementalAggregatorName.put((Class)entry.getValue().getClass(), entry.getKey());
-    }
-
-    incrementalAggregatorIDToAggregator = Maps.newHashMap();
-
-    for (Map.Entry<String, Integer> entry : incrementalAggregatorNameToID.entrySet()) {
-      String aggregatorName = entry.getKey();
-      int aggregatorID = entry.getValue();
-      incrementalAggregatorIDToAggregator.put(aggregatorID,
-          nameToIncrementalAggregator.get(aggregatorName));
-    }
-
-    otfAggregatorToIncrementalAggregators = Maps.newHashMap();
-
-    for (Map.Entry<String, OTFAggregator> entry : nameToOTFAggregator.entrySet()) {
-      String name = entry.getKey();
-      List<String> staticAggregators = Lists.newArrayList();
-
-      OTFAggregator dotfAggregator = nameToOTFAggregator.get(name);
-
-      for (Class clazz : dotfAggregator.getChildAggregators()) {
-        staticAggregators.add(classToIncrementalAggregatorName.get(clazz));
-      }
-
-      otfAggregatorToIncrementalAggregators.put(name, staticAggregators);
-    }
-  }
-
-  /**
-   * This is a helper method which sets and validated the given mapping from an {@link IncrementalAggregator}'s name
-   * to an {@link IncrementalAggregator}.
-   *
-   * @param nameToIncrementalAggregator The mapping from an {@link IncrementalAggregator}'s name to an
-   *                                    {@link IncrementalAggregator}.
-   */
-  private void setNameToIncrementalAggregator(Map<String, IncrementalAggregator> nameToIncrementalAggregator)
-  {
-    this.nameToIncrementalAggregator = Maps.newHashMap(Preconditions.checkNotNull(nameToIncrementalAggregator));
-  }
-
-  /**
-   * This is a helper method which sets and validates the given mapping from an {@link OTFAggregator}'s name to
-   * an {@link OTFAggregator}.
-   *
-   * @param nameToOTFAggregator The mapping from an {@link OTFAggregator}'s name to an {@link OTFAggregator}.
-   */
-  private void setNameToOTFAggregator(Map<String, OTFAggregator> nameToOTFAggregator)
-  {
-    this.nameToOTFAggregator = Maps.newHashMap(Preconditions.checkNotNull(nameToOTFAggregator));
-  }
-
-  /**
-   * Checks if the given aggregatorName is the name of an {@link IncrementalAggregator} or {@link OTFAggregator}
-   * registered to this registry.
-   *
-   * @param aggregatorName The aggregator name to check.
-   * @return True if the given aggregator name is the name of an {@link IncrementalAggregator} registered to
-   * this registry. False otherwise.
-   */
-  public boolean isAggregator(String aggregatorName)
-  {
-    return classToIncrementalAggregatorName.values().contains(aggregatorName) ||
-        nameToOTFAggregator.containsKey(aggregatorName);
-  }
-
-  /**
-   * Checks if the given aggregator name is the name of an {@link IncrementalAggregator} registered
-   * to this registry.
-   *
-   * @param aggregatorName The aggregator name to check.
-   * @return True if the given aggregator name is the name of an {@link IncrementalAggregator} registered
-   * to this registry. False otherwise.
-   */
-  public boolean isIncrementalAggregator(String aggregatorName)
-  {
-    return classToIncrementalAggregatorName.values().contains(aggregatorName);
-  }
-
-  /**
-   * Gets the mapping from an {@link IncrementalAggregator}'s class to the {@link IncrementalAggregator}.
-   *
-   * @return The mapping from an {@link IncrementalAggregator}'s class to the {@link IncrementalAggregator}.
-   */
-  public Map<Class<? extends IncrementalAggregator>, String> getClassToIncrementalAggregatorName()
-  {
-    return classToIncrementalAggregatorName;
-  }
-
-  /**
-   * Gets the mapping from an {@link IncrementalAggregator}'s ID to the {@link IncrementalAggregator}.
-   *
-   * @return The mapping from an {@link IncrementalAggregator}'s ID to the {@link IncrementalAggregator}.
-   */
-  public Map<Integer, IncrementalAggregator> getIncrementalAggregatorIDToAggregator()
-  {
-    return incrementalAggregatorIDToAggregator;
-  }
-
-  /**
-   * This a helper method which sets and validates the mapping from {@link IncrementalAggregator} name to
-   * {@link IncrementalAggregator} ID.
-   *
-   * @param incrementalAggregatorNameToID The mapping from {@link IncrementalAggregator} name to
-   *                                      {@link IncrementalAggregator} ID.
-   */
-  private void setIncrementalAggregatorNameToID(Map<String, Integer> incrementalAggregatorNameToID)
-  {
-    Preconditions.checkNotNull(incrementalAggregatorNameToID);
-
-    for (Map.Entry<String, Integer> entry : incrementalAggregatorNameToID.entrySet()) {
-      Preconditions.checkNotNull(entry.getKey());
-      Preconditions.checkNotNull(entry.getValue());
-    }
-
-    this.incrementalAggregatorNameToID = Maps.newHashMap(incrementalAggregatorNameToID);
-  }
-
-  /**
-   * This returns a map from the names of an {@link IncrementalAggregator}s to the corresponding ID of the
-   * {@link IncrementalAggregator}.
-   *
-   * @return Returns a map from the names of an {@link IncrementalAggregator} to the corresponding ID of the
-   * {@link IncrementalAggregator}.
-   */
-  public Map<String, Integer> getIncrementalAggregatorNameToID()
-  {
-    return incrementalAggregatorNameToID;
-  }
-
-  /**
-   * Returns the name to {@link OTFAggregator} mapping, where the key is the name of the {@link OTFAggregator}.
-   *
-   * @return The name to {@link OTFAggregator} mapping.
-   */
-  public Map<String, OTFAggregator> getNameToOTFAggregators()
-  {
-    return nameToOTFAggregator;
-  }
-
-  /**
-   * Returns the mapping from {@link OTFAggregator} names to a list of names of all the child aggregators of
-   * that {@link OTFAggregator}.
-   *
-   * @return The mapping from {@link OTFAggregator} names to a list of names of all the child aggregators of
-   * that {@link OTFAggregator}.
-   */
-  public Map<String, List<String>> getOTFAggregatorToIncrementalAggregators()
-  {
-    return otfAggregatorToIncrementalAggregators;
-  }
-
-  /**
-   * Returns the name to {@link IncrementalAggregator} mapping, where the key is the name of the {@link OTFAggregator}.
-   *
-   * @return The name to {@link IncrementalAggregator} mapping.
-   */
-  public Map<String, IncrementalAggregator> getNameToIncrementalAggregator()
-  {
-    return nameToIncrementalAggregator;
-  }
-
-  private static final Logger lOG = LoggerFactory.getLogger(AggregatorRegistry.class);
-}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorSum.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorSum.java b/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorSum.java
deleted file mode 100644
index c68744b..0000000
--- a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorSum.java
+++ /dev/null
@@ -1,254 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.dimensions.aggregator;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.datatorrent.api.annotation.Name;
-import com.datatorrent.lib.appdata.gpo.GPOMutable;
-import com.datatorrent.lib.appdata.gpo.GPOUtils;
-import com.datatorrent.lib.appdata.schemas.FieldsDescriptor;
-import com.datatorrent.lib.appdata.schemas.Type;
-import com.datatorrent.lib.dimensions.DimensionsEvent.Aggregate;
-import com.datatorrent.lib.dimensions.DimensionsEvent.InputEvent;
-
-/**
- * This {@link IncrementalAggregator} performs a sum operation over the fields in the given {@link InputEvent}.
- *
- * @since 3.1.0
- */
-@Name("SUM")
-public class AggregatorSum extends AbstractIncrementalAggregator
-{
-  private static final long serialVersionUID = 20154301649L;
-
-  public AggregatorSum()
-  {
-    //Do nothing
-  }
-
-  @Override
-  public Aggregate getGroup(InputEvent src, int aggregatorIndex)
-  {
-    src.used = true;
-    Aggregate aggregate = createAggregate(src,
-        context,
-        aggregatorIndex);
-
-    GPOMutable value = aggregate.getAggregates();
-    GPOUtils.zeroFillNumeric(value);
-
-    return aggregate;
-  }
-
-  @Override
-  public void aggregate(Aggregate dest, Aggregate src)
-  {
-    GPOMutable destAggs = dest.getAggregates();
-    GPOMutable srcAggs = src.getAggregates();
-
-    aggregateAggs(destAggs, srcAggs);
-  }
-
-  public void aggregateAggs(GPOMutable destAggs, GPOMutable srcAggs)
-  {
-    {
-      byte[] destByte = destAggs.getFieldsByte();
-      if (destByte != null) {
-        byte[] srcByte = srcAggs.getFieldsByte();
-
-        for (int index = 0;
-            index < destByte.length;
-            index++) {
-          destByte[index] += srcByte[index];
-        }
-      }
-    }
-
-    {
-      short[] destShort = destAggs.getFieldsShort();
-      if (destShort != null) {
-        short[] srcShort = srcAggs.getFieldsShort();
-
-        for (int index = 0;
-            index < destShort.length;
-            index++) {
-          destShort[index] += srcShort[index];
-        }
-      }
-    }
-
-    {
-      int[] destInteger = destAggs.getFieldsInteger();
-      if (destInteger != null) {
-        int[] srcInteger = srcAggs.getFieldsInteger();
-
-        for (int index = 0;
-            index < destInteger.length;
-            index++) {
-          destInteger[index] += srcInteger[index];
-        }
-      }
-    }
-
-    {
-      long[] destLong = destAggs.getFieldsLong();
-      if (destLong != null) {
-        long[] srcLong = srcAggs.getFieldsLong();
-
-        for (int index = 0;
-            index < destLong.length;
-            index++) {
-          destLong[index] += srcLong[index];
-        }
-      }
-    }
-
-    {
-      float[] destFloat = destAggs.getFieldsFloat();
-      if (destFloat != null) {
-        float[] srcFloat = srcAggs.getFieldsFloat();
-
-        for (int index = 0;
-            index < destFloat.length;
-            index++) {
-          destFloat[index] += srcFloat[index];
-        }
-      }
-    }
-
-    {
-      double[] destDouble = destAggs.getFieldsDouble();
-      if (destDouble != null) {
-        double[] srcDouble = srcAggs.getFieldsDouble();
-
-        for (int index = 0;
-            index < destDouble.length;
-            index++) {
-          destDouble[index] += srcDouble[index];
-        }
-      }
-    }
-  }
-
-  @Override
-  public void aggregate(Aggregate dest, InputEvent src)
-  {
-    GPOMutable destAggs = dest.getAggregates();
-    GPOMutable srcAggs = src.getAggregates();
-
-    aggregateInput(destAggs, srcAggs);
-  }
-
-  public void aggregateInput(GPOMutable destAggs, GPOMutable srcAggs)
-  {
-    {
-      byte[] destByte = destAggs.getFieldsByte();
-      if (destByte != null) {
-        byte[] srcByte = srcAggs.getFieldsByte();
-        int[] srcIndices = context.indexSubsetAggregates.fieldsByteIndexSubset;
-        for (int index = 0;
-            index < destByte.length;
-            index++) {
-          destByte[index] += srcByte[srcIndices[index]];
-        }
-      }
-    }
-
-    {
-      short[] destShort = destAggs.getFieldsShort();
-      if (destShort != null) {
-        short[] srcShort = srcAggs.getFieldsShort();
-        int[] srcIndices = context.indexSubsetAggregates.fieldsShortIndexSubset;
-        for (int index = 0;
-            index < destShort.length;
-            index++) {
-          destShort[index] += srcShort[srcIndices[index]];
-        }
-      }
-    }
-
-    {
-      int[] destInteger = destAggs.getFieldsInteger();
-      if (destInteger != null) {
-        int[] srcInteger = srcAggs.getFieldsInteger();
-        int[] srcIndices = context.indexSubsetAggregates.fieldsIntegerIndexSubset;
-        for (int index = 0;
-            index < destInteger.length;
-            index++) {
-          destInteger[index] += srcInteger[srcIndices[index]];
-        }
-      }
-    }
-
-    {
-      long[] destLong = destAggs.getFieldsLong();
-      if (destLong != null) {
-        long[] srcLong = srcAggs.getFieldsLong();
-        int[] srcIndices = context.indexSubsetAggregates.fieldsLongIndexSubset;
-        for (int index = 0;
-            index < destLong.length;
-            index++) {
-          destLong[index] += srcLong[srcIndices[index]];
-        }
-      }
-    }
-
-    {
-      float[] destFloat = destAggs.getFieldsFloat();
-      if (destFloat != null) {
-        float[] srcFloat = srcAggs.getFieldsFloat();
-        int[] srcIndices = context.indexSubsetAggregates.fieldsFloatIndexSubset;
-        for (int index = 0;
-            index < destFloat.length;
-            index++) {
-          destFloat[index] += srcFloat[srcIndices[index]];
-        }
-      }
-    }
-
-    {
-      double[] destDouble = destAggs.getFieldsDouble();
-      if (destDouble != null) {
-        double[] srcDouble = srcAggs.getFieldsDouble();
-        int[] srcIndices = context.indexSubsetAggregates.fieldsDoubleIndexSubset;
-        for (int index = 0;
-            index < destDouble.length;
-            index++) {
-          destDouble[index] += srcDouble[srcIndices[index]];
-        }
-      }
-    }
-  }
-
-  @Override
-  public Type getOutputType(Type inputType)
-  {
-    return AggregatorUtils.IDENTITY_NUMBER_TYPE_MAP.get(inputType);
-  }
-
-  @Override
-  public FieldsDescriptor getMetaDataDescriptor()
-  {
-    return null;
-  }
-
-  private static final Logger LOG = LoggerFactory.getLogger(AggregatorSum.class);
-}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorUtils.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorUtils.java b/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorUtils.java
deleted file mode 100644
index 9643310..0000000
--- a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorUtils.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.dimensions.aggregator;
-
-import java.util.Collections;
-import java.util.Map;
-
-import com.google.common.collect.Maps;
-
-import com.datatorrent.lib.appdata.schemas.Fields;
-import com.datatorrent.lib.appdata.schemas.FieldsDescriptor;
-import com.datatorrent.lib.appdata.schemas.Type;
-
-/**
- * This class contains utility methods which are useful for aggregators.
- *
- * @since 3.1.0
- */
-public final class AggregatorUtils
-{
-  /**
-   * This is an identity type map, which maps input types to the same output types.
-   */
-  public static final transient Map<Type, Type> IDENTITY_TYPE_MAP;
-  /**
-   * This is an identity type map, for numeric types only. This is
-   * helpful when creating aggregators like {@link AggregatorSum}, where the sum of ints is an
-   * int and the sum of floats is a float.
-   */
-  public static final transient Map<Type, Type> IDENTITY_NUMBER_TYPE_MAP;
-
-  static {
-    Map<Type, Type> identityTypeMap = Maps.newHashMap();
-
-    for (Type type : Type.values()) {
-      identityTypeMap.put(type, type);
-    }
-
-    IDENTITY_TYPE_MAP = Collections.unmodifiableMap(identityTypeMap);
-
-    Map<Type, Type> identityNumberTypeMap = Maps.newHashMap();
-
-    for (Type type : Type.NUMERIC_TYPES) {
-      identityNumberTypeMap.put(type, type);
-    }
-
-    IDENTITY_NUMBER_TYPE_MAP = Collections.unmodifiableMap(identityNumberTypeMap);
-  }
-
-  /**
-   * Don't instantiate this class.
-   */
-  private AggregatorUtils()
-  {
-    //Don't instantiate this class.
-  }
-
-  /**
-   * This is a helper method which takes a {@link FieldsDescriptor} object, which defines the types of the fields
-   * that the {@link IncrementalAggregator} receives as input. It then uses the given {@link IncrementalAggregator}
-   * and {@link FieldsDescriptor} object to compute the {@link FieldsDescriptor} object for the aggregation produced
-   * byte the given
-   * {@link IncrementalAggregator} when it receives an input corresponding to the given input {@link FieldsDescriptor}.
-   *
-   * @param inputFieldsDescriptor This is a {@link FieldsDescriptor} object which defines the names and types of input
-   *                              data recieved by an aggregator.
-   * @param incrementalAggregator This is the
-   * {@link IncrementalAggregator} for which an output {@link FieldsDescriptor} needs
-   *                              to be computed.
-   * @return The output {@link FieldsDescriptor} for this aggregator when it receives input data with the same schema as
-   * the specified input {@link FieldsDescriptor}.
-   */
-  public static FieldsDescriptor getOutputFieldsDescriptor(FieldsDescriptor inputFieldsDescriptor,
-      IncrementalAggregator incrementalAggregator)
-  {
-    Map<String, Type> fieldToType = Maps.newHashMap();
-
-    for (Map.Entry<String, Type> entry :
-        inputFieldsDescriptor.getFieldToType().entrySet()) {
-      String fieldName = entry.getKey();
-      Type fieldType = entry.getValue();
-      Type outputType = incrementalAggregator.getOutputType(fieldType);
-      fieldToType.put(fieldName, outputType);
-    }
-
-    return new FieldsDescriptor(fieldToType);
-  }
-
-  /**
-   * This is a utility method which creates an output {@link FieldsDescriptor} using the field names
-   * from the given {@link FieldsDescriptor} and the output type of the given {@link OTFAggregator}.
-   *
-   * @param inputFieldsDescriptor The {@link FieldsDescriptor} from which to derive the field names used
-   *                              for the output fields descriptor.
-   * @param otfAggregator         The {@link OTFAggregator} to use for creating the output {@link FieldsDescriptor}.
-   * @return The output {@link FieldsDescriptor}.
-   */
-  public static FieldsDescriptor getOutputFieldsDescriptor(FieldsDescriptor inputFieldsDescriptor,
-      OTFAggregator otfAggregator)
-  {
-    Map<String, Type> fieldToType = Maps.newHashMap();
-
-    for (Map.Entry<String, Type> entry :
-        inputFieldsDescriptor.getFieldToType().entrySet()) {
-      String fieldName = entry.getKey();
-      Type outputType = otfAggregator.getOutputType();
-      fieldToType.put(fieldName, outputType);
-    }
-
-    return new FieldsDescriptor(fieldToType);
-  }
-
-  /**
-   * This is a utility method which creates an output {@link FieldsDescriptor} from the
-   * given field names and the given {@link OTFAggregator}.
-   *
-   * @param fields        The names of the fields to be included in the output {@link FieldsDescriptor}.
-   * @param otfAggregator The {@link OTFAggregator} to use when creating the output {@link FieldsDescriptor}.
-   * @return The output {@link FieldsDescriptor}.
-   */
-  public static FieldsDescriptor getOutputFieldsDescriptor(Fields fields,
-      OTFAggregator otfAggregator)
-  {
-    Map<String, Type> fieldToType = Maps.newHashMap();
-
-    for (String field : fields.getFields()) {
-      fieldToType.put(field, otfAggregator.getOutputType());
-    }
-
-    return new FieldsDescriptor(fieldToType);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/IncrementalAggregator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/IncrementalAggregator.java b/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/IncrementalAggregator.java
deleted file mode 100644
index 2825e0a..0000000
--- a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/IncrementalAggregator.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.dimensions.aggregator;
-
-import com.datatorrent.lib.appdata.schemas.FieldsDescriptor;
-import com.datatorrent.lib.appdata.schemas.Type;
-import com.datatorrent.lib.dimensions.DimensionsConversionContext;
-import com.datatorrent.lib.dimensions.DimensionsEvent.Aggregate;
-import com.datatorrent.lib.dimensions.DimensionsEvent.InputEvent;
-import com.datatorrent.lib.dimensions.aggregator.AggregateEvent.Aggregator;
-
-/**
- * <p>
- * {@link IncrementalAggregator}s perform aggregations in place, on a field by field basis. For example if we have a
- * field cost, an incremental aggregator would take a new value of cost and aggregate it to an aggregate value for
- * cost. No fields except the cost field are used in the computation of the cost aggregation in the case of an
- * {@link IncrementalAggregator}.
- * </p>
- * <p>
- * {@link IncrementalAggregator}s are intended to be used with subclasses of
- * {@link com.datatorrent.lib.dimensions.AbstractDimensionsComputationFlexibleSingleSchema}. The way in which
- * {@link IncrementalAggregator}s are used in this context is that a batch of fields to be aggregated by the aggregator
- * are provided in the form of an {@link InputEvent}. For example, if there are two fields (cost and revenue), which
- * will be aggregated by a sum aggregator, both of those fields will be included in the {@link InputEvent} passed to
- * the sum aggregator. And the {DimensionsEventregate} event produced by the sum aggregator will contain two fields,
- * one for cost and one for revenue.
- * </p>
- *
- */
-public interface IncrementalAggregator extends Aggregator<InputEvent, Aggregate>
-{
-  /**
-   * This method defines the type mapping for the {@link IncrementalAggregator}. The type mapping defines the
-   * relationship between the type of an input field and the type of its aggregate. For example if the aggregator takes
-   * a field of type int and produces an aggregate of type float, then this method would return a type of float when
-   * the given input type is an int.
-   * @param inputType The type of a field to be aggregate.
-   * @return The type of the aggregate corresponding to an input field of the given type.
-   */
-  public Type getOutputType(Type inputType);
-
-  /**
-   * This sets
-   */
-  public void setDimensionsConversionContext(DimensionsConversionContext context);
-
-  /**
-   * Returns a {@link FieldsDescriptor} object which describes the meta data that is stored along with aggregations.
-   * This method returns null if this aggregator stores no metadata.
-   * @return A {@link FieldsDescriptor} object which describes the meta data that is stored along with aggregations.
-   * This method returns null if this aggregator stores no metadata.
-   */
-  public FieldsDescriptor getMetaDataDescriptor();
-}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/OTFAggregator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/OTFAggregator.java b/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/OTFAggregator.java
deleted file mode 100644
index e5d8638..0000000
--- a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/OTFAggregator.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.dimensions.aggregator;
-
-import java.io.Serializable;
-
-import java.util.List;
-
-import com.datatorrent.lib.appdata.gpo.GPOMutable;
-import com.datatorrent.lib.appdata.schemas.Type;
-
-/**
- * <p>
- * This interface represents an On The Fly Aggregator. On the fly aggregators represent a class
- * of aggregations which use the results of incremental aggregators, which implement the
- * {@link com.datatorrent.lib.dimensions.aggregator.IncrementalAggregator} interface. An example of an aggregation which
- * needs to be performed on the fly is average. Average needs to be performed on the fly because average cannot be
- * computed with just an existing average and a new data item, an average required the sum of all data items, and the
- * count of all data items. An example implementation of average is {@link AggregatorAverage}. Also note
- * that unlike {@link IncrementalAggregator}s an {@link OTFAggregator} only has one output type. This done
- * because {@link OTFAggregator}s usually represent a very specific computation, with a specific output type.
- * For example, average is a computation that you will almost always want to produce a double. But if you require
- * an average operation that produces an integer, that could be done as a separate {@link OTFAggregator}.
- * </p>
- * <p>
- * The primary usage for {@link OTFAggregator}s are in store operators which respond to queries. Currently,
- * the only places which utilize {@link OTFAggregator}s are subclasses of the DimensionsStoreHDHT operator.
- * </p>
- * <p>
- * This interface extends {@link Serializable} because On The Fly aggregators may be set
- * as properties on some operators and operator properties are required to be java serializable.
- * </p>
- * @since 3.1.0
- */
-public interface OTFAggregator extends Serializable
-{
-  public static final long serialVersionUID = 201505251039L;
-
-  /**
-   * This method returns all the incremental aggregators on which this aggregator depends on
-   * to compute its result. In the case of {@link AggregatorAverage} it's child aggregators are
-   * {@link AggregatorCount} and {@link AggregatorSum}.
-   * @return All the incremental aggregators on which this aggregator depends on to compute its
-   * result.
-   */
-
-  public List<Class<? extends IncrementalAggregator>> getChildAggregators();
-  /**
-   * This method performs an on the fly aggregation from the given aggregates. The aggregates
-   * provided to this aggregator are each the result of one of this aggregators child aggregators.
-   * The order in which the aggregates are passed to this method is the same as the order in
-   * which the child aggregators are listed in the result of the {@link #getChildAggregators} method.
-   * Also note that this aggregator does not aggregate one field at a time. This aggregator recieves
-   * a batch of fields from each child aggregator, and the result of the method is also a batch of fields.
-   * @param aggregates These are the results of all the child aggregators. The results are in the same
-   * order as the child aggregators specified in the result of the {@link #getChildAggregators} method.
-   * @return The result of the on the fly aggregation.
-   */
-
-  public GPOMutable aggregate(GPOMutable... aggregates);
-  /**
-   * Returns the output type of the {@link OTFAggregator}. <b>Note<b> that any combination of input types
-   * will produce the same output type for {@link OTFAggregator}s.
-   * @return The output type of the {@link OTFAggregator}.
-   */
-
-  public Type getOutputType();
-}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/com/datatorrent/lib/dimensions/package-info.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/dimensions/package-info.java b/library/src/main/java/com/datatorrent/lib/dimensions/package-info.java
deleted file mode 100644
index 4988df7..0000000
--- a/library/src/main/java/com/datatorrent/lib/dimensions/package-info.java
+++ /dev/null
@@ -1,20 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-@org.apache.hadoop.classification.InterfaceStability.Evolving
-package com.datatorrent.lib.dimensions;

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/org/apache/apex/malhar/lib/dimensions/CustomTimeBucketRegistry.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dimensions/CustomTimeBucketRegistry.java b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/CustomTimeBucketRegistry.java
new file mode 100644
index 0000000..216e577
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/CustomTimeBucketRegistry.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dimensions;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
+import com.google.common.base.Preconditions;
+
+import com.datatorrent.lib.appdata.schemas.CustomTimeBucket;
+
+import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
+import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
+import it.unimi.dsi.fastutil.objects.Object2IntMap;
+import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
+
+/**
+ * @since 3.3.0
+ */
+public class CustomTimeBucketRegistry implements Serializable
+{
+  private static final long serialVersionUID = 201509221536L;
+
+  private int currentId;
+
+  private Int2ObjectMap<CustomTimeBucket> idToTimeBucket = new Int2ObjectOpenHashMap<>();
+  private Object2IntMap<CustomTimeBucket> timeBucketToId = new Object2IntOpenHashMap<>();
+  private Map<String, CustomTimeBucket> textToTimeBucket = new HashMap<>();
+
+  public CustomTimeBucketRegistry()
+  {
+  }
+
+  public CustomTimeBucketRegistry(int startingId)
+  {
+    this.currentId = startingId;
+  }
+
+  public CustomTimeBucketRegistry(Int2ObjectMap<CustomTimeBucket> idToTimeBucket)
+  {
+    initialize(idToTimeBucket);
+  }
+
+  public CustomTimeBucketRegistry(Int2ObjectMap<CustomTimeBucket> idToTimeBucket, int startingId)
+  {
+    int tempId = initialize(idToTimeBucket);
+
+    Preconditions.checkArgument(tempId < startingId, "The statingId " + startingId
+        + " must be larger than the largest ID " + tempId + " in the given idToTimeBucket mapping");
+
+    this.idToTimeBucket = Preconditions.checkNotNull(idToTimeBucket);
+    this.currentId = startingId;
+  }
+
+  private int initialize(Int2ObjectMap<CustomTimeBucket> idToTimeBucket)
+  {
+    Preconditions.checkNotNull(idToTimeBucket);
+
+    int tempId = Integer.MIN_VALUE;
+
+    for (int timeBucketId : idToTimeBucket.keySet()) {
+      tempId = Math.max(tempId, timeBucketId);
+      CustomTimeBucket customTimeBucket = idToTimeBucket.get(timeBucketId);
+      textToTimeBucket.put(customTimeBucket.getText(), customTimeBucket);
+      Preconditions.checkNotNull(customTimeBucket);
+      timeBucketToId.put(customTimeBucket, timeBucketId);
+    }
+
+    return tempId;
+  }
+
+  public CustomTimeBucket getTimeBucket(int timeBucketId)
+  {
+    return idToTimeBucket.get(timeBucketId);
+  }
+
+  public Integer getTimeBucketId(CustomTimeBucket timeBucket)
+  {
+    if (!timeBucketToId.containsKey(timeBucket)) {
+      return null;
+    }
+
+    return timeBucketToId.get(timeBucket);
+  }
+
+  public CustomTimeBucket getTimeBucket(String text)
+  {
+    return textToTimeBucket.get(text);
+  }
+
+  public void register(CustomTimeBucket timeBucket)
+  {
+    register(timeBucket, currentId);
+  }
+
+  public void register(CustomTimeBucket timeBucket, int timeBucketId)
+  {
+    if (timeBucketToId.containsKey(timeBucket)) {
+      throw new IllegalArgumentException("The timeBucket " + timeBucket + " is already registered.");
+    }
+
+    if (timeBucketToId.containsValue(timeBucketId)) {
+      throw new IllegalArgumentException("The timeBucketId " + timeBucketId + " is already registered.");
+    }
+
+    idToTimeBucket.put(timeBucketId, timeBucket);
+    timeBucketToId.put(timeBucket, timeBucketId);
+
+    if (timeBucketId >= currentId) {
+      currentId = timeBucketId + 1;
+    }
+
+    textToTimeBucket.put(timeBucket.getText(), timeBucket);
+  }
+
+  @Override
+  public String toString()
+  {
+    return "CustomTimeBucketRegistry{" + "idToTimeBucket=" + idToTimeBucket + '}';
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/org/apache/apex/malhar/lib/dimensions/DimensionsConversionContext.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dimensions/DimensionsConversionContext.java b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/DimensionsConversionContext.java
new file mode 100644
index 0000000..90d4f7b
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/DimensionsConversionContext.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dimensions;
+
+import java.io.Serializable;
+
+import com.datatorrent.lib.appdata.gpo.GPOUtils.IndexSubset;
+import com.datatorrent.lib.appdata.schemas.FieldsDescriptor;
+
+/**
+ * This is a context object used to convert {@link InputEvent}s into aggregates
+ * in {@link IncrementalAggregator}s.
+ *
+ * @since 3.3.0
+ */
+public class DimensionsConversionContext implements Serializable
+{
+  private static final long serialVersionUID = 201506151157L;
+
+  public CustomTimeBucketRegistry customTimeBucketRegistry;
+  /**
+   * The schema ID for {@link Aggregate}s emitted by the
+   * {@link org.apache.apex.malhar.lib.dimensions.aggregator.IncrementalAggregator}
+   * s holding this context.
+   */
+  public int schemaID;
+  /**
+   * The dimensionsDescriptor ID for {@link Aggregate}s emitted by the
+   * {@link org.apache.apex.malhar.lib.dimensions.aggregator.IncrementalAggregator}
+   * s holding this context.
+   */
+  public int dimensionsDescriptorID;
+  /**
+   * The aggregator ID for {@link Aggregate}s emitted by the
+   * {@link org.apache.apex.malhar.lib.dimensions.aggregator.IncrementalAggregator}
+   * s holding this context.
+   */
+  public int aggregatorID;
+  /**
+   * The {@link DimensionsDescriptor} corresponding to the given dimension
+   * descriptor id.
+   */
+  public DimensionsDescriptor dd;
+  /**
+   * The {@link FieldsDescriptor} for the aggregate of the {@link Aggregate}s
+   * emitted by the
+   * {@link org.apache.apex.malhar.lib.dimensions.aggregator.IncrementalAggregator}
+   * s holding this context object.
+   */
+  public FieldsDescriptor aggregateDescriptor;
+  /**
+   * The {@link FieldsDescriptor} for the key of the {@link Aggregate}s emitted
+   * by the
+   * {@link org.apache.apex.malhar.lib.dimensions.aggregator.IncrementalAggregator}
+   * s holding this context object.
+   */
+  public FieldsDescriptor keyDescriptor;
+  /**
+   * The index of the timestamp field within the key of {@link InputEvent}s
+   * received by the
+   * {@link org.apache.apex.malhar.lib.dimensions.aggregator.IncrementalAggregator}
+   * s holding this context object. This is -1 if the {@link InputEvent} key has
+   * no timestamp.
+   */
+  public int inputTimestampIndex;
+  /**
+   * The index of the timestamp field within the key of {@link Aggregate}s
+   * emitted by the
+   * {@link org.apache.apex.malhar.lib.dimensions.aggregator.IncrementalAggregator}
+   * s holding this context object. This is -1 if the {@link Aggregate}'s key
+   * has no timestamp.
+   */
+  public int outputTimestampIndex;
+  /**
+   * The index of the time bucket field within the key of {@link Aggregate}s
+   * emitted by the
+   * {@link org.apache.apex.malhar.lib.dimensions.aggregator.IncrementalAggregator}
+   * s holding this context object. This is -1 if the {@link Aggregate}'s key
+   * has no timebucket.
+   */
+  public int outputTimebucketIndex;
+  /**
+   * The {@link IndexSubset} object that is used to extract key values from
+   * {@link InputEvent}s received by this aggregator.
+   */
+  public IndexSubset indexSubsetKeys;
+  /**
+   * The {@link IndexSubset} object that is used to extract aggregate values
+   * from {@link InputEvent}s received by this aggregator.
+   */
+  public IndexSubset indexSubsetAggregates;
+
+  /**
+   * Constructor for creating conversion context.
+   */
+  public DimensionsConversionContext()
+  {
+    //Do nothing.
+  }
+}