You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ti...@apache.org on 2016/03/29 00:39:44 UTC
[3/4] incubator-apex-malhar git commit: APEXMALHAR-1991 #resolve
#comment Move Dimensions Computation Classes to org.apache.apex.malhar
package
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorFirst.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorFirst.java b/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorFirst.java
deleted file mode 100644
index e1bf7d4..0000000
--- a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorFirst.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.dimensions.aggregator;
-
-import com.datatorrent.api.annotation.Name;
-import com.datatorrent.lib.appdata.gpo.GPOUtils;
-import com.datatorrent.lib.appdata.schemas.FieldsDescriptor;
-import com.datatorrent.lib.appdata.schemas.Type;
-import com.datatorrent.lib.dimensions.DimensionsEvent.Aggregate;
-import com.datatorrent.lib.dimensions.DimensionsEvent.InputEvent;
-
-/**
- * <p>
- * This aggregator creates an aggregate out of the first {@link InputEvent} encountered by this aggregator. All
- * subsequent
- * {@link InputEvent}s are ignored.
- * </p>
- * <p>
- * <b>Note:</b> when aggregates are combined in a unifier it is not possible to tell which came first or last, so
- * one is picked arbitrarily to be the first.
- * </p>
- *
- * @since 3.1.0
- */
-@Name("FIRST")
-public class AggregatorFirst extends AbstractIncrementalAggregator
-{
- private static final long serialVersionUID = 20154301646L;
-
- public AggregatorFirst()
- {
- //Do nothing
- }
-
- @Override
- public Aggregate getGroup(InputEvent src, int aggregatorIndex)
- {
- Aggregate aggregate = super.getGroup(src, aggregatorIndex);
-
- GPOUtils.indirectCopy(aggregate.getAggregates(), src.getAggregates(), context.indexSubsetAggregates);
-
- return aggregate;
- }
-
- @Override
- public Type getOutputType(Type inputType)
- {
- return AggregatorUtils.IDENTITY_TYPE_MAP.get(inputType);
- }
-
- @Override
- public void aggregate(Aggregate dest, InputEvent src)
- {
- //Ignore
- }
-
- @Override
- public void aggregate(Aggregate dest, Aggregate src)
- {
- //Ignore
- }
-
- @Override
- public FieldsDescriptor getMetaDataDescriptor()
- {
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorIncrementalType.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorIncrementalType.java b/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorIncrementalType.java
deleted file mode 100644
index 09190e1..0000000
--- a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorIncrementalType.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.dimensions.aggregator;
-
-import java.util.Collections;
-import java.util.Map;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
-
-/**
- * @since 3.1.0
- */
-
-public enum AggregatorIncrementalType
-{
- SUM(new AggregatorSum()),
- MIN(new AggregatorMin()),
- MAX(new AggregatorMax()),
- COUNT(new AggregatorCount()),
- LAST(new AggregatorLast()),
- FIRST(new AggregatorFirst()),
- CUM_SUM(new AggregatorCumSum());
-
- public static final Map<String, Integer> NAME_TO_ORDINAL;
- public static final Map<String, IncrementalAggregator> NAME_TO_AGGREGATOR;
-
- private IncrementalAggregator aggregator;
-
- static {
- Map<String, Integer> nameToOrdinal = Maps.newHashMap();
- Map<String, IncrementalAggregator> nameToAggregator = Maps.newHashMap();
-
- for (AggregatorIncrementalType aggType : AggregatorIncrementalType.values()) {
- nameToOrdinal.put(aggType.name(), aggType.ordinal());
- nameToAggregator.put(aggType.name(), aggType.getAggregator());
- }
-
- NAME_TO_ORDINAL = Collections.unmodifiableMap(nameToOrdinal);
- NAME_TO_AGGREGATOR = Collections.unmodifiableMap(nameToAggregator);
- }
-
- AggregatorIncrementalType(IncrementalAggregator aggregator)
- {
- setAggregator(aggregator);
- }
-
- private void setAggregator(IncrementalAggregator aggregator)
- {
- Preconditions.checkNotNull(aggregator);
- this.aggregator = aggregator;
- }
-
- public IncrementalAggregator getAggregator()
- {
- return aggregator;
- }
-
- private static final Logger LOG = LoggerFactory.getLogger(AggregatorIncrementalType.class);
-}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorLast.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorLast.java b/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorLast.java
deleted file mode 100644
index f727036..0000000
--- a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorLast.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.dimensions.aggregator;
-
-import com.datatorrent.api.annotation.Name;
-import com.datatorrent.lib.appdata.gpo.GPOUtils;
-import com.datatorrent.lib.appdata.schemas.FieldsDescriptor;
-import com.datatorrent.lib.appdata.schemas.Type;
-import com.datatorrent.lib.dimensions.DimensionsEvent;
-import com.datatorrent.lib.dimensions.DimensionsEvent.Aggregate;
-import com.datatorrent.lib.dimensions.DimensionsEvent.InputEvent;
-
-/**
- * <p>
- * This aggregator creates an aggregate out of the last {@link InputEvent} encountered by this aggregator. All previous
- * {@link InputEvent}s are ignored.
- * </p>
- * <p>
- * <b>Note:</b> when aggregates are combined in a unifier it is not possible to tell which came first or last, so
- * one is picked arbitrarily to be the last.
- * </p>
- *
- * @since 3.1.0
- */
-@Name("LAST")
-public class AggregatorLast extends AbstractIncrementalAggregator
-{
- private static final long serialVersionUID = 20154301647L;
-
- public AggregatorLast()
- {
- //Do nothing
- }
-
- @Override
- public Aggregate getGroup(InputEvent src, int aggregatorIndex)
- {
- Aggregate aggregate = super.getGroup(src, aggregatorIndex);
-
- GPOUtils.indirectCopy(aggregate.getAggregates(), src.getAggregates(), context.indexSubsetAggregates);
-
- return aggregate;
- }
-
- @Override
- public Type getOutputType(Type inputType)
- {
- return AggregatorUtils.IDENTITY_TYPE_MAP.get(inputType);
- }
-
- @Override
- public void aggregate(Aggregate dest, InputEvent src)
- {
- GPOUtils.indirectCopy(dest.getAggregates(), src.getAggregates(), context.indexSubsetAggregates);
- }
-
- @Override
- public void aggregate(Aggregate dest, Aggregate src)
- {
- DimensionsEvent.copy(dest, src);
- }
-
- @Override
- public FieldsDescriptor getMetaDataDescriptor()
- {
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorMax.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorMax.java b/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorMax.java
deleted file mode 100644
index 25f9db2..0000000
--- a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorMax.java
+++ /dev/null
@@ -1,265 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.dimensions.aggregator;
-
-import com.datatorrent.api.annotation.Name;
-import com.datatorrent.lib.appdata.gpo.GPOMutable;
-import com.datatorrent.lib.appdata.gpo.GPOUtils;
-import com.datatorrent.lib.appdata.schemas.FieldsDescriptor;
-import com.datatorrent.lib.appdata.schemas.Type;
-import com.datatorrent.lib.dimensions.DimensionsEvent.Aggregate;
-import com.datatorrent.lib.dimensions.DimensionsEvent.InputEvent;
-
-/**
- * This {@link IncrementalAggregator} takes the max of the fields provided in the {@link InputEvent}.
- *
- * @since 3.1.0
- */
-@Name("MAX")
-public class AggregatorMax extends AbstractIncrementalAggregator
-{
- private static final long serialVersionUID = 201503120332L;
-
- public AggregatorMax()
- {
- //Do nothing
- }
-
- @Override
- public Aggregate getGroup(InputEvent src, int aggregatorIndex)
- {
- Aggregate aggregate = super.getGroup(src, aggregatorIndex);
-
- GPOUtils.indirectCopy(aggregate.getAggregates(), src.getAggregates(), context.indexSubsetAggregates);
-
- return aggregate;
- }
-
- @Override
- public void aggregate(Aggregate dest, InputEvent src)
- {
- GPOMutable destAggs = dest.getAggregates();
- GPOMutable srcAggs = src.getAggregates();
-
- {
- byte[] destByte = destAggs.getFieldsByte();
- if (destByte != null) {
- byte[] srcByte = srcAggs.getFieldsByte();
- int[] srcIndices = context.indexSubsetAggregates.fieldsByteIndexSubset;
- for (int index = 0;
- index < destByte.length;
- index++) {
- byte tempVal = srcByte[srcIndices[index]];
- if (destByte[index] < tempVal) {
- destByte[index] = tempVal;
- }
- }
- }
- }
-
- {
- short[] destShort = destAggs.getFieldsShort();
- if (destShort != null) {
- short[] srcShort = srcAggs.getFieldsShort();
- int[] srcIndices = context.indexSubsetAggregates.fieldsShortIndexSubset;
- for (int index = 0;
- index < destShort.length;
- index++) {
- short tempVal = srcShort[srcIndices[index]];
- if (destShort[index] < tempVal) {
- destShort[index] = tempVal;
- }
- }
- }
- }
-
- {
- int[] destInteger = destAggs.getFieldsInteger();
- if (destInteger != null) {
- int[] srcInteger = srcAggs.getFieldsInteger();
- int[] srcIndices = context.indexSubsetAggregates.fieldsIntegerIndexSubset;
- for (int index = 0;
- index < destInteger.length;
- index++) {
- int tempVal = srcInteger[srcIndices[index]];
- if (destInteger[index] < tempVal) {
- destInteger[index] = tempVal;
- }
- }
- }
- }
-
- {
- long[] destLong = destAggs.getFieldsLong();
- if (destLong != null) {
- long[] srcLong = srcAggs.getFieldsLong();
- int[] srcIndices = context.indexSubsetAggregates.fieldsLongIndexSubset;
- for (int index = 0;
- index < destLong.length;
- index++) {
- long tempVal = srcLong[srcIndices[index]];
- if (destLong[index] < tempVal) {
- destLong[index] = tempVal;
- }
- }
- }
- }
-
- {
- float[] destFloat = destAggs.getFieldsFloat();
- if (destFloat != null) {
- float[] srcFloat = srcAggs.getFieldsFloat();
- int[] srcIndices = context.indexSubsetAggregates.fieldsFloatIndexSubset;
- for (int index = 0;
- index < destFloat.length;
- index++) {
- float tempVal = srcFloat[srcIndices[index]];
- if (destFloat[index] < tempVal) {
- destFloat[index] = tempVal;
- }
- }
- }
- }
-
- {
- double[] destDouble = destAggs.getFieldsDouble();
- if (destDouble != null) {
- double[] srcDouble = srcAggs.getFieldsDouble();
- int[] srcIndices = context.indexSubsetAggregates.fieldsDoubleIndexSubset;
- for (int index = 0;
- index < destDouble.length;
- index++) {
- double tempVal = srcDouble[srcIndices[index]];
- if (destDouble[index] < tempVal) {
- destDouble[index] = tempVal;
- }
- }
- }
- }
- }
-
- @Override
- public void aggregate(Aggregate dest, Aggregate src)
- {
- GPOMutable destAggs = dest.getAggregates();
- GPOMutable srcAggs = src.getAggregates();
-
- {
- byte[] destByte = destAggs.getFieldsByte();
- if (destByte != null) {
- byte[] srcByte = srcAggs.getFieldsByte();
-
- for (int index = 0;
- index < destByte.length;
- index++) {
- if (destByte[index] < srcByte[index]) {
- destByte[index] = srcByte[index];
- }
- }
- }
- }
-
- {
- short[] destShort = destAggs.getFieldsShort();
- if (destShort != null) {
- short[] srcShort = srcAggs.getFieldsShort();
-
- for (int index = 0;
- index < destShort.length;
- index++) {
- if (destShort[index] < srcShort[index]) {
- destShort[index] = srcShort[index];
- }
- }
- }
- }
-
- {
- int[] destInteger = destAggs.getFieldsInteger();
- if (destInteger != null) {
- int[] srcInteger = srcAggs.getFieldsInteger();
-
- for (int index = 0;
- index < destInteger.length;
- index++) {
- if (destInteger[index] < srcInteger[index]) {
- destInteger[index] = srcInteger[index];
- }
- }
- }
- }
-
- {
- long[] destLong = destAggs.getFieldsLong();
- if (destLong != null) {
- long[] srcLong = srcAggs.getFieldsLong();
-
- for (int index = 0;
- index < destLong.length;
- index++) {
- if (destLong[index] < srcLong[index]) {
- destLong[index] = srcLong[index];
- }
- }
- }
- }
-
- {
- float[] destFloat = destAggs.getFieldsFloat();
- if (destFloat != null) {
- float[] srcFloat = srcAggs.getFieldsFloat();
-
- for (int index = 0;
- index < destFloat.length;
- index++) {
- if (destFloat[index] < srcFloat[index]) {
- destFloat[index] = srcFloat[index];
- }
- }
- }
- }
-
- {
- double[] destDouble = destAggs.getFieldsDouble();
- if (destDouble != null) {
- double[] srcDouble = srcAggs.getFieldsDouble();
-
- for (int index = 0;
- index < destDouble.length;
- index++) {
- if (destDouble[index] < srcDouble[index]) {
- destDouble[index] = srcDouble[index];
- }
- }
- }
- }
- }
-
- @Override
- public Type getOutputType(Type inputType)
- {
- return AggregatorUtils.IDENTITY_NUMBER_TYPE_MAP.get(inputType);
- }
-
- @Override
- public FieldsDescriptor getMetaDataDescriptor()
- {
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorMin.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorMin.java b/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorMin.java
deleted file mode 100644
index b377e9b..0000000
--- a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorMin.java
+++ /dev/null
@@ -1,265 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.dimensions.aggregator;
-
-import com.datatorrent.api.annotation.Name;
-import com.datatorrent.lib.appdata.gpo.GPOMutable;
-import com.datatorrent.lib.appdata.gpo.GPOUtils;
-import com.datatorrent.lib.appdata.schemas.FieldsDescriptor;
-import com.datatorrent.lib.appdata.schemas.Type;
-import com.datatorrent.lib.dimensions.DimensionsEvent.Aggregate;
-import com.datatorrent.lib.dimensions.DimensionsEvent.InputEvent;
-
-/**
- * This {@link IncrementalAggregator} takes the min of the fields provided in the {@link InputEvent}.
- *
- * @since 3.1.0
- */
-@Name("MIN")
-public class AggregatorMin extends AbstractIncrementalAggregator
-{
- private static final long serialVersionUID = 20154301648L;
-
- public AggregatorMin()
- {
- //Do nothing
- }
-
- @Override
- public Aggregate getGroup(InputEvent src, int aggregatorIndex)
- {
- Aggregate aggregate = super.getGroup(src, aggregatorIndex);
-
- GPOUtils.indirectCopy(aggregate.getAggregates(), src.getAggregates(), context.indexSubsetAggregates);
-
- return aggregate;
- }
-
- @Override
- public void aggregate(Aggregate dest, InputEvent src)
- {
- GPOMutable destAggs = dest.getAggregates();
- GPOMutable srcAggs = src.getAggregates();
-
- {
- byte[] destByte = destAggs.getFieldsByte();
- if (destByte != null) {
- byte[] srcByte = srcAggs.getFieldsByte();
- int[] srcIndices = context.indexSubsetAggregates.fieldsByteIndexSubset;
- for (int index = 0;
- index < destByte.length;
- index++) {
- byte tempVal = srcByte[srcIndices[index]];
- if (destByte[index] > tempVal) {
- destByte[index] = tempVal;
- }
- }
- }
- }
-
- {
- short[] destShort = destAggs.getFieldsShort();
- if (destShort != null) {
- short[] srcShort = srcAggs.getFieldsShort();
- int[] srcIndices = context.indexSubsetAggregates.fieldsShortIndexSubset;
- for (int index = 0;
- index < destShort.length;
- index++) {
- short tempVal = srcShort[srcIndices[index]];
- if (destShort[index] > tempVal) {
- destShort[index] = tempVal;
- }
- }
- }
- }
-
- {
- int[] destInteger = destAggs.getFieldsInteger();
- if (destInteger != null) {
- int[] srcInteger = srcAggs.getFieldsInteger();
- int[] srcIndices = context.indexSubsetAggregates.fieldsIntegerIndexSubset;
- for (int index = 0;
- index < destInteger.length;
- index++) {
- int tempVal = srcInteger[srcIndices[index]];
- if (destInteger[index] > tempVal) {
- destInteger[index] = tempVal;
- }
- }
- }
- }
-
- {
- long[] destLong = destAggs.getFieldsLong();
- if (destLong != null) {
- long[] srcLong = srcAggs.getFieldsLong();
- int[] srcIndices = context.indexSubsetAggregates.fieldsLongIndexSubset;
- for (int index = 0;
- index < destLong.length;
- index++) {
- long tempVal = srcLong[srcIndices[index]];
- if (destLong[index] > tempVal) {
- destLong[index] = tempVal;
- }
- }
- }
- }
-
- {
- float[] destFloat = destAggs.getFieldsFloat();
- if (destFloat != null) {
- float[] srcFloat = srcAggs.getFieldsFloat();
- int[] srcIndices = context.indexSubsetAggregates.fieldsFloatIndexSubset;
- for (int index = 0;
- index < destFloat.length;
- index++) {
- float tempVal = srcFloat[srcIndices[index]];
- if (destFloat[index] > tempVal) {
- destFloat[index] = tempVal;
- }
- }
- }
- }
-
- {
- double[] destDouble = destAggs.getFieldsDouble();
- if (destDouble != null) {
- double[] srcDouble = srcAggs.getFieldsDouble();
- int[] srcIndices = context.indexSubsetAggregates.fieldsDoubleIndexSubset;
- for (int index = 0;
- index < destDouble.length;
- index++) {
- double tempVal = srcDouble[srcIndices[index]];
- if (destDouble[index] > tempVal) {
- destDouble[index] = tempVal;
- }
- }
- }
- }
- }
-
- @Override
- public void aggregate(Aggregate dest, Aggregate src)
- {
- GPOMutable destAggs = dest.getAggregates();
- GPOMutable srcAggs = src.getAggregates();
-
- {
- byte[] destByte = destAggs.getFieldsByte();
- if (destByte != null) {
- byte[] srcByte = srcAggs.getFieldsByte();
-
- for (int index = 0;
- index < destByte.length;
- index++) {
- if (destByte[index] > srcByte[index]) {
- destByte[index] = srcByte[index];
- }
- }
- }
- }
-
- {
- short[] destShort = destAggs.getFieldsShort();
- if (destShort != null) {
- short[] srcShort = srcAggs.getFieldsShort();
-
- for (int index = 0;
- index < destShort.length;
- index++) {
- if (destShort[index] > srcShort[index]) {
- destShort[index] = srcShort[index];
- }
- }
- }
- }
-
- {
- int[] destInteger = destAggs.getFieldsInteger();
- if (destInteger != null) {
- int[] srcInteger = srcAggs.getFieldsInteger();
-
- for (int index = 0;
- index < destInteger.length;
- index++) {
- if (destInteger[index] > srcInteger[index]) {
- destInteger[index] = srcInteger[index];
- }
- }
- }
- }
-
- {
- long[] destLong = destAggs.getFieldsLong();
- if (destLong != null) {
- long[] srcLong = srcAggs.getFieldsLong();
-
- for (int index = 0;
- index < destLong.length;
- index++) {
- if (destLong[index] > srcLong[index]) {
- destLong[index] = srcLong[index];
- }
- }
- }
- }
-
- {
- float[] destFloat = destAggs.getFieldsFloat();
- if (destFloat != null) {
- float[] srcFloat = srcAggs.getFieldsFloat();
-
- for (int index = 0;
- index < destFloat.length;
- index++) {
- if (destFloat[index] > srcFloat[index]) {
- destFloat[index] = srcFloat[index];
- }
- }
- }
- }
-
- {
- double[] destDouble = destAggs.getFieldsDouble();
- if (destDouble != null) {
- double[] srcDouble = srcAggs.getFieldsDouble();
-
- for (int index = 0;
- index < destDouble.length;
- index++) {
- if (destDouble[index] > srcDouble[index]) {
- destDouble[index] = srcDouble[index];
- }
- }
- }
- }
- }
-
- @Override
- public Type getOutputType(Type inputType)
- {
- return AggregatorUtils.IDENTITY_NUMBER_TYPE_MAP.get(inputType);
- }
-
- @Override
- public FieldsDescriptor getMetaDataDescriptor()
- {
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorOTFType.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorOTFType.java b/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorOTFType.java
deleted file mode 100644
index fd711cb..0000000
--- a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorOTFType.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.dimensions.aggregator;
-
-import java.util.Collections;
-import java.util.Map;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
-
-/**
- * This is a convenience enum to store all the information about default {@link OTFAggregator}s
- * in one place.
- *
- * @since 3.1.0
- */
-public enum AggregatorOTFType
-{
- /**
- * The average {@link OTFAggregator}.
- */
- AVG(AggregatorAverage.INSTANCE);
-
- /**
- * A map from {@link OTFAggregator} names to {@link OTFAggregator}s.
- */
- public static final Map<String, OTFAggregator> NAME_TO_AGGREGATOR;
-
- static {
- Map<String, OTFAggregator> nameToAggregator = Maps.newHashMap();
-
- for (AggregatorOTFType aggType : AggregatorOTFType.values()) {
- nameToAggregator.put(aggType.name(), aggType.getAggregator());
- }
-
- NAME_TO_AGGREGATOR = Collections.unmodifiableMap(nameToAggregator);
- }
-
- /**
- * The {@link OTFAggregator} assigned to this enum.
- */
- private OTFAggregator aggregator;
-
- /**
- * Creates an {@link OTFAggregator} enum with the given aggregator.
- *
- * @param aggregator The {@link OTFAggregator} assigned to this enum.
- */
- AggregatorOTFType(OTFAggregator aggregator)
- {
- setAggregator(aggregator);
- }
-
- /**
- * Sets the {@link OTFAggregator} assigned to this enum.
- *
- * @param aggregator The {@link OTFAggregator} assigned to this enum.
- */
- private void setAggregator(OTFAggregator aggregator)
- {
- this.aggregator = Preconditions.checkNotNull(aggregator);
- }
-
- /**
- * Gets the {@link OTFAggregator} assigned to this enum.
- *
- * @return The {@link OTFAggregator} assigned to this enum.
- */
- public OTFAggregator getAggregator()
- {
- return aggregator;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorRegistry.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorRegistry.java b/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorRegistry.java
deleted file mode 100644
index ff5a75d..0000000
--- a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorRegistry.java
+++ /dev/null
@@ -1,424 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.dimensions.aggregator;
-
-import java.io.Serializable;
-import java.util.List;
-import java.util.Map;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-/**
- * <p>
- * This registry is used by generic dimensions computation operators and dimension stores in order to support
- * plugging different
- * aggregators into the operator. Subclasses of
- * {@link com.datatorrent.lib.dimensions.AbstractDimensionsComputationFlexibleSingleSchema} use this registry
- * to support pluggable aggregators when doing dimensions computation, and Subclasses of
- * AppDataSingleSchemaDimensionStoreHDHT use this class as well.
- * </p>
- * <p>
- * The primary purpose of an {@link AggregatorRegistry} is to provide a mapping from aggregator names to aggregators,
- * and to provide mappings from aggregator IDs to aggregators. These mappings are necessary in order to correctly
- * process schemas, App Data queries, and store aggregated data.
- * </p>
- *
- * @since 3.1.0
- */
-public class AggregatorRegistry implements Serializable
-{
- private static final long serialVersionUID = 20154301642L;
-
- /**
- * This is a map from {@link IncrementalAggregator} names to {@link IncrementalAggregator}s used by the
- * default {@link AggregatorRegistry}.
- */
- private static final transient Map<String, IncrementalAggregator> DEFAULT_NAME_TO_INCREMENTAL_AGGREGATOR;
- /**
- * This is a map from {@link OTFAggregator} names to {@link OTFAggregator}s used by the default
- * {@link AggregatorRegistry}.
- */
- private static final transient Map<String, OTFAggregator> DEFAULT_NAME_TO_OTF_AGGREGATOR;
-
- //Build the default maps
- static {
- DEFAULT_NAME_TO_INCREMENTAL_AGGREGATOR = Maps.newHashMap(AggregatorIncrementalType.NAME_TO_AGGREGATOR);
- DEFAULT_NAME_TO_OTF_AGGREGATOR = Maps.newHashMap(AggregatorOTFType.NAME_TO_AGGREGATOR);
- }
-
- /**
- * This is a default aggregator registry that can be used in operators.
- */
- public static final AggregatorRegistry DEFAULT_AGGREGATOR_REGISTRY = new AggregatorRegistry(
- DEFAULT_NAME_TO_INCREMENTAL_AGGREGATOR, DEFAULT_NAME_TO_OTF_AGGREGATOR,
- AggregatorIncrementalType.NAME_TO_ORDINAL);
-
- /**
- * This is a flag indicating whether or not this {@link AggregatorRegistry} has been setup before or not.
- */
- private transient boolean setup = false;
- /**
- * This is a map from the class of an {@link IncrementalAggregator} to the name of that
- * {@link IncrementalAggregator}.
- */
- private transient Map<Class<? extends IncrementalAggregator>, String> classToIncrementalAggregatorName;
- /**
- * This is a map from the name of an {@link OTFAggregator} to the list of the names of all
- * {@link IncrementalAggregator} that are child aggregators of that {@link OTFAggregator}.
- */
- private transient Map<String, List<String>> otfAggregatorToIncrementalAggregators;
- /**
- * This is a map from the aggregator ID of an
- * {@link IncrementalAggregator} to the corresponding {@link IncrementalAggregator}.
- */
- private transient Map<Integer, IncrementalAggregator> incrementalAggregatorIDToAggregator;
- /**
- * This is a map from the name assigned to an {@link IncrementalAggregator} to the {@link IncrementalAggregator}.
- */
- private Map<String, IncrementalAggregator> nameToIncrementalAggregator;
- /**
- * This is a map from the name assigned to an {@link OTFAggregator} to the {@link OTFAggregator}.
- */
- private Map<String, OTFAggregator> nameToOTFAggregator;
- /**
- * This is a map from the name of an {@link IncrementalAggregator} to the ID of that {@link IncrementalAggregator}.
- */
- private Map<String, Integer> incrementalAggregatorNameToID;
-
- /**
- * This is a helper method used to autogenerate the IDs for each {@link IncrementalAggregator}
- *
- * @param nameToAggregator A mapping from the name of an {@link IncrementalAggregator} to the
- * {@link IncrementalAggregator}.
- * @return A mapping from the name of an {@link IncrementalAggregator} to the ID assigned to that
- * {@link IncrementalAggregator}.
- */
- private static Map<String, Integer> autoGenIds(Map<String, IncrementalAggregator> nameToAggregator)
- {
- Map<String, Integer> staticAggregatorNameToID = Maps.newHashMap();
-
- for (Map.Entry<String, IncrementalAggregator> entry : nameToAggregator.entrySet()) {
- staticAggregatorNameToID.put(entry.getKey(), stringHash(entry.getValue().getClass().getName()));
- }
-
- return staticAggregatorNameToID;
- }
-
- /**
- * This is a helper method for computing the hash of the string. This is intended to be a static unchanging
- * method since the computed hash is used for aggregator IDs which are used for persistence.
- * <p>
- * <b>Note:</b> Do not change this function it will cause corruption for users updating existing data stores.
- * </p>
- *
- * @return The hash of the given string.
- */
- private static int stringHash(String string)
- {
- int hash = 5381;
-
- for (int index = 0;
- index < string.length();
- index++) {
- int character = (int)string.charAt(index);
- hash = hash * 33 + character;
- }
-
- return hash;
- }
-
- /**
- * This constructor is present for Kryo serialization
- */
- private AggregatorRegistry()
- {
- //for kryo
- }
-
- /**
- * <p>
- * This creates an {@link AggregatorRegistry} which assigns the given names to the given
- * {@link IncrementalAggregator}s and {@link OTFAggregator}s. This constructor also auto-generates
- * the IDs associated with each {@link IncrementalAggregator} by computing the hashcode of the
- * fully qualified class name of each {@link IncrementalAggregator}.
- * </p>
- * <p>
- * <b>Note:</b> IDs only need to be generated for {@link IncrementalAggregator}s since they are the
- * only type of stored aggregations. {@link OTFAggregator}s do not require an ID since they are not stored.
- * </p>
- *
- * @param nameToIncrementalAggregator This is a map from {@link String} to {@link IncrementalAggregator},
- * where the string is the name of an
- * {@link IncrementalAggregator} and the value is the {@link IncrementalAggregator}
- * with that name.
- * @param nameToOTFAggregator This is a map from {@link String} to {@link OTFAggregator}, where the string
- * is the name of
- * an {@link OTFAggregator} and the value is the {@link OTFAggregator} with that
- * name.
- */
- public AggregatorRegistry(Map<String, IncrementalAggregator> nameToIncrementalAggregator,
- Map<String, OTFAggregator> nameToOTFAggregator)
- {
- this(nameToIncrementalAggregator,
- nameToOTFAggregator,
- autoGenIds(nameToIncrementalAggregator));
- }
-
- /**
- * <p>
- * This creates an {@link AggregatorRegistry} which assigns the given names to the given
- * {@link IncrementalAggregator}s and {@link OTFAggregator}s. This constructor assigns IDs to each
- * {@link IncrementalAggregator} by using the provided map from incremental aggregator names to IDs.
- * </p>
- * <p>
- * <b>Note:</b> IDs only need to be generated for {@link IncrementalAggregator}s since they are the
- * only type of stored aggregations. {@link OTFAggregator}s do not require an ID since they are not stored.
- * </p>
- *
- * @param nameToIncrementalAggregator This is a map from {@link String} to {@link IncrementalAggregator},
- * where the string is the name of an
- * {@link IncrementalAggregator} and the value is the
- * {@link IncrementalAggregator}
- * with that name.
- * @param nameToOTFAggregator This is a map from {@link String} to {@link OTFAggregator}, where the
- * string is the name of
- * an {@link OTFAggregator} and the value is the {@link OTFAggregator} with
- * that name.
- * @param incrementalAggregatorNameToID This is a map from the name of an {@link IncrementalAggregator} to the ID
- * for that
- * {@link IncrementalAggregator}.
- */
- public AggregatorRegistry(Map<String, IncrementalAggregator> nameToIncrementalAggregator,
- Map<String, OTFAggregator> nameToOTFAggregator,
- Map<String, Integer> incrementalAggregatorNameToID)
- {
- setNameToIncrementalAggregator(nameToIncrementalAggregator);
- setNameToOTFAggregator(nameToOTFAggregator);
-
- setIncrementalAggregatorNameToID(incrementalAggregatorNameToID);
-
- validate();
- }
-
- /**
- * This is a helper method which is used to do validation on the maps provided to the constructor of this class.
- */
- private void validate()
- {
- for (Map.Entry<String, IncrementalAggregator> entry : nameToIncrementalAggregator.entrySet()) {
- Preconditions.checkNotNull(entry.getKey());
- Preconditions.checkNotNull(entry.getValue());
- }
-
- for (Map.Entry<String, OTFAggregator> entry : nameToOTFAggregator.entrySet()) {
- Preconditions.checkNotNull(entry.getKey());
- Preconditions.checkNotNull(entry.getValue());
- }
-
- for (Map.Entry<String, Integer> entry : incrementalAggregatorNameToID.entrySet()) {
- Preconditions.checkNotNull(entry.getKey());
- Preconditions.checkNotNull(entry.getValue());
- }
- }
-
- /**
- * This method is called to initialize various internal datastructures of the {@link AggregatorRegistry}.
- * This method should be called before the {@link AggregatorRegistry} is used.
- */
- @SuppressWarnings({"unchecked", "rawtypes"})
- public void setup()
- {
- if (setup) {
- //If the AggregatorRegistry was already setup. Don't set it up again.
- return;
- }
-
- setup = true;
-
- classToIncrementalAggregatorName = Maps.newHashMap();
-
- for (Map.Entry<String, IncrementalAggregator> entry : nameToIncrementalAggregator.entrySet()) {
- classToIncrementalAggregatorName.put((Class)entry.getValue().getClass(), entry.getKey());
- }
-
- incrementalAggregatorIDToAggregator = Maps.newHashMap();
-
- for (Map.Entry<String, Integer> entry : incrementalAggregatorNameToID.entrySet()) {
- String aggregatorName = entry.getKey();
- int aggregatorID = entry.getValue();
- incrementalAggregatorIDToAggregator.put(aggregatorID,
- nameToIncrementalAggregator.get(aggregatorName));
- }
-
- otfAggregatorToIncrementalAggregators = Maps.newHashMap();
-
- for (Map.Entry<String, OTFAggregator> entry : nameToOTFAggregator.entrySet()) {
- String name = entry.getKey();
- List<String> staticAggregators = Lists.newArrayList();
-
- OTFAggregator dotfAggregator = nameToOTFAggregator.get(name);
-
- for (Class clazz : dotfAggregator.getChildAggregators()) {
- staticAggregators.add(classToIncrementalAggregatorName.get(clazz));
- }
-
- otfAggregatorToIncrementalAggregators.put(name, staticAggregators);
- }
- }
-
- /**
- * This is a helper method which sets and validated the given mapping from an {@link IncrementalAggregator}'s name
- * to an {@link IncrementalAggregator}.
- *
- * @param nameToIncrementalAggregator The mapping from an {@link IncrementalAggregator}'s name to an
- * {@link IncrementalAggregator}.
- */
- private void setNameToIncrementalAggregator(Map<String, IncrementalAggregator> nameToIncrementalAggregator)
- {
- this.nameToIncrementalAggregator = Maps.newHashMap(Preconditions.checkNotNull(nameToIncrementalAggregator));
- }
-
- /**
- * This is a helper method which sets and validates the given mapping from an {@link OTFAggregator}'s name to
- * an {@link OTFAggregator}.
- *
- * @param nameToOTFAggregator The mapping from an {@link OTFAggregator}'s name to an {@link OTFAggregator}.
- */
- private void setNameToOTFAggregator(Map<String, OTFAggregator> nameToOTFAggregator)
- {
- this.nameToOTFAggregator = Maps.newHashMap(Preconditions.checkNotNull(nameToOTFAggregator));
- }
-
- /**
- * Checks if the given aggregatorName is the name of an {@link IncrementalAggregator} or {@link OTFAggregator}
- * registered to this registry.
- *
- * @param aggregatorName The aggregator name to check.
- * @return True if the given aggregator name is the name of an {@link IncrementalAggregator} registered to
- * this registry. False otherwise.
- */
- public boolean isAggregator(String aggregatorName)
- {
- return classToIncrementalAggregatorName.values().contains(aggregatorName) ||
- nameToOTFAggregator.containsKey(aggregatorName);
- }
-
- /**
- * Checks if the given aggregator name is the name of an {@link IncrementalAggregator} registered
- * to this registry.
- *
- * @param aggregatorName The aggregator name to check.
- * @return True if the given aggregator name is the name of an {@link IncrementalAggregator} registered
- * to this registry. False otherwise.
- */
- public boolean isIncrementalAggregator(String aggregatorName)
- {
- return classToIncrementalAggregatorName.values().contains(aggregatorName);
- }
-
- /**
- * Gets the mapping from an {@link IncrementalAggregator}'s class to the {@link IncrementalAggregator}.
- *
- * @return The mapping from an {@link IncrementalAggregator}'s class to the {@link IncrementalAggregator}.
- */
- public Map<Class<? extends IncrementalAggregator>, String> getClassToIncrementalAggregatorName()
- {
- return classToIncrementalAggregatorName;
- }
-
- /**
- * Gets the mapping from an {@link IncrementalAggregator}'s ID to the {@link IncrementalAggregator}.
- *
- * @return The mapping from an {@link IncrementalAggregator}'s ID to the {@link IncrementalAggregator}.
- */
- public Map<Integer, IncrementalAggregator> getIncrementalAggregatorIDToAggregator()
- {
- return incrementalAggregatorIDToAggregator;
- }
-
- /**
- * This a helper method which sets and validates the mapping from {@link IncrementalAggregator} name to
- * {@link IncrementalAggregator} ID.
- *
- * @param incrementalAggregatorNameToID The mapping from {@link IncrementalAggregator} name to
- * {@link IncrementalAggregator} ID.
- */
- private void setIncrementalAggregatorNameToID(Map<String, Integer> incrementalAggregatorNameToID)
- {
- Preconditions.checkNotNull(incrementalAggregatorNameToID);
-
- for (Map.Entry<String, Integer> entry : incrementalAggregatorNameToID.entrySet()) {
- Preconditions.checkNotNull(entry.getKey());
- Preconditions.checkNotNull(entry.getValue());
- }
-
- this.incrementalAggregatorNameToID = Maps.newHashMap(incrementalAggregatorNameToID);
- }
-
- /**
- * This returns a map from the names of an {@link IncrementalAggregator}s to the corresponding ID of the
- * {@link IncrementalAggregator}.
- *
- * @return Returns a map from the names of an {@link IncrementalAggregator} to the corresponding ID of the
- * {@link IncrementalAggregator}.
- */
- public Map<String, Integer> getIncrementalAggregatorNameToID()
- {
- return incrementalAggregatorNameToID;
- }
-
- /**
- * Returns the name to {@link OTFAggregator} mapping, where the key is the name of the {@link OTFAggregator}.
- *
- * @return The name to {@link OTFAggregator} mapping.
- */
- public Map<String, OTFAggregator> getNameToOTFAggregators()
- {
- return nameToOTFAggregator;
- }
-
- /**
- * Returns the mapping from {@link OTFAggregator} names to a list of names of all the child aggregators of
- * that {@link OTFAggregator}.
- *
- * @return The mapping from {@link OTFAggregator} names to a list of names of all the child aggregators of
- * that {@link OTFAggregator}.
- */
- public Map<String, List<String>> getOTFAggregatorToIncrementalAggregators()
- {
- return otfAggregatorToIncrementalAggregators;
- }
-
- /**
- * Returns the name to {@link IncrementalAggregator} mapping, where the key is the name of the {@link OTFAggregator}.
- *
- * @return The name to {@link IncrementalAggregator} mapping.
- */
- public Map<String, IncrementalAggregator> getNameToIncrementalAggregator()
- {
- return nameToIncrementalAggregator;
- }
-
- private static final Logger lOG = LoggerFactory.getLogger(AggregatorRegistry.class);
-}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorSum.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorSum.java b/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorSum.java
deleted file mode 100644
index c68744b..0000000
--- a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorSum.java
+++ /dev/null
@@ -1,254 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.dimensions.aggregator;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.datatorrent.api.annotation.Name;
-import com.datatorrent.lib.appdata.gpo.GPOMutable;
-import com.datatorrent.lib.appdata.gpo.GPOUtils;
-import com.datatorrent.lib.appdata.schemas.FieldsDescriptor;
-import com.datatorrent.lib.appdata.schemas.Type;
-import com.datatorrent.lib.dimensions.DimensionsEvent.Aggregate;
-import com.datatorrent.lib.dimensions.DimensionsEvent.InputEvent;
-
-/**
- * This {@link IncrementalAggregator} performs a sum operation over the fields in the given {@link InputEvent}.
- *
- * @since 3.1.0
- */
-@Name("SUM")
-public class AggregatorSum extends AbstractIncrementalAggregator
-{
- private static final long serialVersionUID = 20154301649L;
-
- public AggregatorSum()
- {
- //Do nothing
- }
-
- @Override
- public Aggregate getGroup(InputEvent src, int aggregatorIndex)
- {
- src.used = true;
- Aggregate aggregate = createAggregate(src,
- context,
- aggregatorIndex);
-
- GPOMutable value = aggregate.getAggregates();
- GPOUtils.zeroFillNumeric(value);
-
- return aggregate;
- }
-
- @Override
- public void aggregate(Aggregate dest, Aggregate src)
- {
- GPOMutable destAggs = dest.getAggregates();
- GPOMutable srcAggs = src.getAggregates();
-
- aggregateAggs(destAggs, srcAggs);
- }
-
- public void aggregateAggs(GPOMutable destAggs, GPOMutable srcAggs)
- {
- {
- byte[] destByte = destAggs.getFieldsByte();
- if (destByte != null) {
- byte[] srcByte = srcAggs.getFieldsByte();
-
- for (int index = 0;
- index < destByte.length;
- index++) {
- destByte[index] += srcByte[index];
- }
- }
- }
-
- {
- short[] destShort = destAggs.getFieldsShort();
- if (destShort != null) {
- short[] srcShort = srcAggs.getFieldsShort();
-
- for (int index = 0;
- index < destShort.length;
- index++) {
- destShort[index] += srcShort[index];
- }
- }
- }
-
- {
- int[] destInteger = destAggs.getFieldsInteger();
- if (destInteger != null) {
- int[] srcInteger = srcAggs.getFieldsInteger();
-
- for (int index = 0;
- index < destInteger.length;
- index++) {
- destInteger[index] += srcInteger[index];
- }
- }
- }
-
- {
- long[] destLong = destAggs.getFieldsLong();
- if (destLong != null) {
- long[] srcLong = srcAggs.getFieldsLong();
-
- for (int index = 0;
- index < destLong.length;
- index++) {
- destLong[index] += srcLong[index];
- }
- }
- }
-
- {
- float[] destFloat = destAggs.getFieldsFloat();
- if (destFloat != null) {
- float[] srcFloat = srcAggs.getFieldsFloat();
-
- for (int index = 0;
- index < destFloat.length;
- index++) {
- destFloat[index] += srcFloat[index];
- }
- }
- }
-
- {
- double[] destDouble = destAggs.getFieldsDouble();
- if (destDouble != null) {
- double[] srcDouble = srcAggs.getFieldsDouble();
-
- for (int index = 0;
- index < destDouble.length;
- index++) {
- destDouble[index] += srcDouble[index];
- }
- }
- }
- }
-
- @Override
- public void aggregate(Aggregate dest, InputEvent src)
- {
- GPOMutable destAggs = dest.getAggregates();
- GPOMutable srcAggs = src.getAggregates();
-
- aggregateInput(destAggs, srcAggs);
- }
-
- public void aggregateInput(GPOMutable destAggs, GPOMutable srcAggs)
- {
- {
- byte[] destByte = destAggs.getFieldsByte();
- if (destByte != null) {
- byte[] srcByte = srcAggs.getFieldsByte();
- int[] srcIndices = context.indexSubsetAggregates.fieldsByteIndexSubset;
- for (int index = 0;
- index < destByte.length;
- index++) {
- destByte[index] += srcByte[srcIndices[index]];
- }
- }
- }
-
- {
- short[] destShort = destAggs.getFieldsShort();
- if (destShort != null) {
- short[] srcShort = srcAggs.getFieldsShort();
- int[] srcIndices = context.indexSubsetAggregates.fieldsShortIndexSubset;
- for (int index = 0;
- index < destShort.length;
- index++) {
- destShort[index] += srcShort[srcIndices[index]];
- }
- }
- }
-
- {
- int[] destInteger = destAggs.getFieldsInteger();
- if (destInteger != null) {
- int[] srcInteger = srcAggs.getFieldsInteger();
- int[] srcIndices = context.indexSubsetAggregates.fieldsIntegerIndexSubset;
- for (int index = 0;
- index < destInteger.length;
- index++) {
- destInteger[index] += srcInteger[srcIndices[index]];
- }
- }
- }
-
- {
- long[] destLong = destAggs.getFieldsLong();
- if (destLong != null) {
- long[] srcLong = srcAggs.getFieldsLong();
- int[] srcIndices = context.indexSubsetAggregates.fieldsLongIndexSubset;
- for (int index = 0;
- index < destLong.length;
- index++) {
- destLong[index] += srcLong[srcIndices[index]];
- }
- }
- }
-
- {
- float[] destFloat = destAggs.getFieldsFloat();
- if (destFloat != null) {
- float[] srcFloat = srcAggs.getFieldsFloat();
- int[] srcIndices = context.indexSubsetAggregates.fieldsFloatIndexSubset;
- for (int index = 0;
- index < destFloat.length;
- index++) {
- destFloat[index] += srcFloat[srcIndices[index]];
- }
- }
- }
-
- {
- double[] destDouble = destAggs.getFieldsDouble();
- if (destDouble != null) {
- double[] srcDouble = srcAggs.getFieldsDouble();
- int[] srcIndices = context.indexSubsetAggregates.fieldsDoubleIndexSubset;
- for (int index = 0;
- index < destDouble.length;
- index++) {
- destDouble[index] += srcDouble[srcIndices[index]];
- }
- }
- }
- }
-
- @Override
- public Type getOutputType(Type inputType)
- {
- return AggregatorUtils.IDENTITY_NUMBER_TYPE_MAP.get(inputType);
- }
-
- @Override
- public FieldsDescriptor getMetaDataDescriptor()
- {
- return null;
- }
-
- private static final Logger LOG = LoggerFactory.getLogger(AggregatorSum.class);
-}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorUtils.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorUtils.java b/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorUtils.java
deleted file mode 100644
index 9643310..0000000
--- a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorUtils.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.dimensions.aggregator;
-
-import java.util.Collections;
-import java.util.Map;
-
-import com.google.common.collect.Maps;
-
-import com.datatorrent.lib.appdata.schemas.Fields;
-import com.datatorrent.lib.appdata.schemas.FieldsDescriptor;
-import com.datatorrent.lib.appdata.schemas.Type;
-
-/**
- * This class contains utility methods which are useful for aggregators.
- *
- * @since 3.1.0
- */
-public final class AggregatorUtils
-{
- /**
- * This is an identity type map, which maps input types to the same output types.
- */
- public static final transient Map<Type, Type> IDENTITY_TYPE_MAP;
- /**
- * This is an identity type map, for numeric types only. This is
- * helpful when creating aggregators like {@link AggregatorSum}, where the sum of ints is an
- * int and the sum of floats is a float.
- */
- public static final transient Map<Type, Type> IDENTITY_NUMBER_TYPE_MAP;
-
- static {
- Map<Type, Type> identityTypeMap = Maps.newHashMap();
-
- for (Type type : Type.values()) {
- identityTypeMap.put(type, type);
- }
-
- IDENTITY_TYPE_MAP = Collections.unmodifiableMap(identityTypeMap);
-
- Map<Type, Type> identityNumberTypeMap = Maps.newHashMap();
-
- for (Type type : Type.NUMERIC_TYPES) {
- identityNumberTypeMap.put(type, type);
- }
-
- IDENTITY_NUMBER_TYPE_MAP = Collections.unmodifiableMap(identityNumberTypeMap);
- }
-
- /**
- * Don't instantiate this class.
- */
- private AggregatorUtils()
- {
- //Don't instantiate this class.
- }
-
- /**
- * This is a helper method which takes a {@link FieldsDescriptor} object, which defines the types of the fields
- * that the {@link IncrementalAggregator} receives as input. It then uses the given {@link IncrementalAggregator}
- * and {@link FieldsDescriptor} object to compute the {@link FieldsDescriptor} object for the aggregation produced
- * byte the given
- * {@link IncrementalAggregator} when it receives an input corresponding to the given input {@link FieldsDescriptor}.
- *
- * @param inputFieldsDescriptor This is a {@link FieldsDescriptor} object which defines the names and types of input
- * data recieved by an aggregator.
- * @param incrementalAggregator This is the
- * {@link IncrementalAggregator} for which an output {@link FieldsDescriptor} needs
- * to be computed.
- * @return The output {@link FieldsDescriptor} for this aggregator when it receives input data with the same schema as
- * the specified input {@link FieldsDescriptor}.
- */
- public static FieldsDescriptor getOutputFieldsDescriptor(FieldsDescriptor inputFieldsDescriptor,
- IncrementalAggregator incrementalAggregator)
- {
- Map<String, Type> fieldToType = Maps.newHashMap();
-
- for (Map.Entry<String, Type> entry :
- inputFieldsDescriptor.getFieldToType().entrySet()) {
- String fieldName = entry.getKey();
- Type fieldType = entry.getValue();
- Type outputType = incrementalAggregator.getOutputType(fieldType);
- fieldToType.put(fieldName, outputType);
- }
-
- return new FieldsDescriptor(fieldToType);
- }
-
- /**
- * This is a utility method which creates an output {@link FieldsDescriptor} using the field names
- * from the given {@link FieldsDescriptor} and the output type of the given {@link OTFAggregator}.
- *
- * @param inputFieldsDescriptor The {@link FieldsDescriptor} from which to derive the field names used
- * for the output fields descriptor.
- * @param otfAggregator The {@link OTFAggregator} to use for creating the output {@link FieldsDescriptor}.
- * @return The output {@link FieldsDescriptor}.
- */
- public static FieldsDescriptor getOutputFieldsDescriptor(FieldsDescriptor inputFieldsDescriptor,
- OTFAggregator otfAggregator)
- {
- Map<String, Type> fieldToType = Maps.newHashMap();
-
- for (Map.Entry<String, Type> entry :
- inputFieldsDescriptor.getFieldToType().entrySet()) {
- String fieldName = entry.getKey();
- Type outputType = otfAggregator.getOutputType();
- fieldToType.put(fieldName, outputType);
- }
-
- return new FieldsDescriptor(fieldToType);
- }
-
- /**
- * This is a utility method which creates an output {@link FieldsDescriptor} from the
- * given field names and the given {@link OTFAggregator}.
- *
- * @param fields The names of the fields to be included in the output {@link FieldsDescriptor}.
- * @param otfAggregator The {@link OTFAggregator} to use when creating the output {@link FieldsDescriptor}.
- * @return The output {@link FieldsDescriptor}.
- */
- public static FieldsDescriptor getOutputFieldsDescriptor(Fields fields,
- OTFAggregator otfAggregator)
- {
- Map<String, Type> fieldToType = Maps.newHashMap();
-
- for (String field : fields.getFields()) {
- fieldToType.put(field, otfAggregator.getOutputType());
- }
-
- return new FieldsDescriptor(fieldToType);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/IncrementalAggregator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/IncrementalAggregator.java b/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/IncrementalAggregator.java
deleted file mode 100644
index 2825e0a..0000000
--- a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/IncrementalAggregator.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.dimensions.aggregator;
-
-import com.datatorrent.lib.appdata.schemas.FieldsDescriptor;
-import com.datatorrent.lib.appdata.schemas.Type;
-import com.datatorrent.lib.dimensions.DimensionsConversionContext;
-import com.datatorrent.lib.dimensions.DimensionsEvent.Aggregate;
-import com.datatorrent.lib.dimensions.DimensionsEvent.InputEvent;
-import com.datatorrent.lib.dimensions.aggregator.AggregateEvent.Aggregator;
-
-/**
- * <p>
- * {@link IncrementalAggregator}s perform aggregations in place, on a field by field basis. For example if we have a
- * field cost, an incremental aggregator would take a new value of cost and aggregate it to an aggregate value for
- * cost. No fields except the cost field are used in the computation of the cost aggregation in the case of an
- * {@link IncrementalAggregator}.
- * </p>
- * <p>
- * {@link IncrementalAggregator}s are intended to be used with subclasses of
- * {@link com.datatorrent.lib.dimensions.AbstractDimensionsComputationFlexibleSingleSchema}. The way in which
- * {@link IncrementalAggregator}s are used in this context is that a batch of fields to be aggregated by the aggregator
- * are provided in the form of an {@link InputEvent}. For example, if there are two fields (cost and revenue), which
- * will be aggregated by a sum aggregator, both of those fields will be included in the {@link InputEvent} passed to
- * the sum aggregator. And the {DimensionsEventregate} event produced by the sum aggregator will contain two fields,
- * one for cost and one for revenue.
- * </p>
- *
- */
-public interface IncrementalAggregator extends Aggregator<InputEvent, Aggregate>
-{
- /**
- * This method defines the type mapping for the {@link IncrementalAggregator}. The type mapping defines the
- * relationship between the type of an input field and the type of its aggregate. For example if the aggregator takes
- * a field of type int and produces an aggregate of type float, then this method would return a type of float when
- * the given input type is an int.
- * @param inputType The type of a field to be aggregate.
- * @return The type of the aggregate corresponding to an input field of the given type.
- */
- public Type getOutputType(Type inputType);
-
- /**
- * This sets
- */
- public void setDimensionsConversionContext(DimensionsConversionContext context);
-
- /**
- * Returns a {@link FieldsDescriptor} object which describes the meta data that is stored along with aggregations.
- * This method returns null if this aggregator stores no metadata.
- * @return A {@link FieldsDescriptor} object which describes the meta data that is stored along with aggregations.
- * This method returns null if this aggregator stores no metadata.
- */
- public FieldsDescriptor getMetaDataDescriptor();
-}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/OTFAggregator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/OTFAggregator.java b/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/OTFAggregator.java
deleted file mode 100644
index e5d8638..0000000
--- a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/OTFAggregator.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.dimensions.aggregator;
-
-import java.io.Serializable;
-
-import java.util.List;
-
-import com.datatorrent.lib.appdata.gpo.GPOMutable;
-import com.datatorrent.lib.appdata.schemas.Type;
-
-/**
- * <p>
- * This interface represents an On The Fly Aggregator. On the fly aggregators represent a class
- * of aggregations which use the results of incremental aggregators, which implement the
- * {@link com.datatorrent.lib.dimensions.aggregator.IncrementalAggregator} interface. An example of an aggregation which
- * needs to be performed on the fly is average. Average needs to be performed on the fly because average cannot be
- * computed with just an existing average and a new data item, an average required the sum of all data items, and the
- * count of all data items. An example implementation of average is {@link AggregatorAverage}. Also note
- * that unlike {@link IncrementalAggregator}s an {@link OTFAggregator} only has one output type. This done
- * because {@link OTFAggregator}s usually represent a very specific computation, with a specific output type.
- * For example, average is a computation that you will almost always want to produce a double. But if you require
- * an average operation that produces an integer, that could be done as a separate {@link OTFAggregator}.
- * </p>
- * <p>
- * The primary usage for {@link OTFAggregator}s are in store operators which respond to queries. Currently,
- * the only places which utilize {@link OTFAggregator}s are subclasses of the DimensionsStoreHDHT operator.
- * </p>
- * <p>
- * This interface extends {@link Serializable} because On The Fly aggregators may be set
- * as properties on some operators and operator properties are required to be java serializable.
- * </p>
- * @since 3.1.0
- */
-public interface OTFAggregator extends Serializable
-{
- public static final long serialVersionUID = 201505251039L;
-
- /**
- * This method returns all the incremental aggregators on which this aggregator depends on
- * to compute its result. In the case of {@link AggregatorAverage} it's child aggregators are
- * {@link AggregatorCount} and {@link AggregatorSum}.
- * @return All the incremental aggregators on which this aggregator depends on to compute its
- * result.
- */
-
- public List<Class<? extends IncrementalAggregator>> getChildAggregators();
- /**
- * This method performs an on the fly aggregation from the given aggregates. The aggregates
- * provided to this aggregator are each the result of one of this aggregators child aggregators.
- * The order in which the aggregates are passed to this method is the same as the order in
- * which the child aggregators are listed in the result of the {@link #getChildAggregators} method.
- * Also note that this aggregator does not aggregate one field at a time. This aggregator recieves
- * a batch of fields from each child aggregator, and the result of the method is also a batch of fields.
- * @param aggregates These are the results of all the child aggregators. The results are in the same
- * order as the child aggregators specified in the result of the {@link #getChildAggregators} method.
- * @return The result of the on the fly aggregation.
- */
-
- public GPOMutable aggregate(GPOMutable... aggregates);
- /**
- * Returns the output type of the {@link OTFAggregator}. <b>Note<b> that any combination of input types
- * will produce the same output type for {@link OTFAggregator}s.
- * @return The output type of the {@link OTFAggregator}.
- */
-
- public Type getOutputType();
-}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/com/datatorrent/lib/dimensions/package-info.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/dimensions/package-info.java b/library/src/main/java/com/datatorrent/lib/dimensions/package-info.java
deleted file mode 100644
index 4988df7..0000000
--- a/library/src/main/java/com/datatorrent/lib/dimensions/package-info.java
+++ /dev/null
@@ -1,20 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-@org.apache.hadoop.classification.InterfaceStability.Evolving
-package com.datatorrent.lib.dimensions;
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/org/apache/apex/malhar/lib/dimensions/CustomTimeBucketRegistry.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dimensions/CustomTimeBucketRegistry.java b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/CustomTimeBucketRegistry.java
new file mode 100644
index 0000000..216e577
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/CustomTimeBucketRegistry.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dimensions;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
+import com.google.common.base.Preconditions;
+
+import com.datatorrent.lib.appdata.schemas.CustomTimeBucket;
+
+import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
+import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
+import it.unimi.dsi.fastutil.objects.Object2IntMap;
+import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
+
+/**
+ * @since 3.3.0
+ */
+public class CustomTimeBucketRegistry implements Serializable
+{
+ private static final long serialVersionUID = 201509221536L;
+
+ private int currentId;
+
+ private Int2ObjectMap<CustomTimeBucket> idToTimeBucket = new Int2ObjectOpenHashMap<>();
+ private Object2IntMap<CustomTimeBucket> timeBucketToId = new Object2IntOpenHashMap<>();
+ private Map<String, CustomTimeBucket> textToTimeBucket = new HashMap<>();
+
+ public CustomTimeBucketRegistry()
+ {
+ }
+
+ public CustomTimeBucketRegistry(int startingId)
+ {
+ this.currentId = startingId;
+ }
+
+ public CustomTimeBucketRegistry(Int2ObjectMap<CustomTimeBucket> idToTimeBucket)
+ {
+ initialize(idToTimeBucket);
+ }
+
+ public CustomTimeBucketRegistry(Int2ObjectMap<CustomTimeBucket> idToTimeBucket, int startingId)
+ {
+ int tempId = initialize(idToTimeBucket);
+
+ Preconditions.checkArgument(tempId < startingId, "The statingId " + startingId
+ + " must be larger than the largest ID " + tempId + " in the given idToTimeBucket mapping");
+
+ this.idToTimeBucket = Preconditions.checkNotNull(idToTimeBucket);
+ this.currentId = startingId;
+ }
+
+ private int initialize(Int2ObjectMap<CustomTimeBucket> idToTimeBucket)
+ {
+ Preconditions.checkNotNull(idToTimeBucket);
+
+ int tempId = Integer.MIN_VALUE;
+
+ for (int timeBucketId : idToTimeBucket.keySet()) {
+ tempId = Math.max(tempId, timeBucketId);
+ CustomTimeBucket customTimeBucket = idToTimeBucket.get(timeBucketId);
+ textToTimeBucket.put(customTimeBucket.getText(), customTimeBucket);
+ Preconditions.checkNotNull(customTimeBucket);
+ timeBucketToId.put(customTimeBucket, timeBucketId);
+ }
+
+ return tempId;
+ }
+
+ public CustomTimeBucket getTimeBucket(int timeBucketId)
+ {
+ return idToTimeBucket.get(timeBucketId);
+ }
+
+ public Integer getTimeBucketId(CustomTimeBucket timeBucket)
+ {
+ if (!timeBucketToId.containsKey(timeBucket)) {
+ return null;
+ }
+
+ return timeBucketToId.get(timeBucket);
+ }
+
+ public CustomTimeBucket getTimeBucket(String text)
+ {
+ return textToTimeBucket.get(text);
+ }
+
+ public void register(CustomTimeBucket timeBucket)
+ {
+ register(timeBucket, currentId);
+ }
+
+ public void register(CustomTimeBucket timeBucket, int timeBucketId)
+ {
+ if (timeBucketToId.containsKey(timeBucket)) {
+ throw new IllegalArgumentException("The timeBucket " + timeBucket + " is already registered.");
+ }
+
+ if (timeBucketToId.containsValue(timeBucketId)) {
+ throw new IllegalArgumentException("The timeBucketId " + timeBucketId + " is already registered.");
+ }
+
+ idToTimeBucket.put(timeBucketId, timeBucket);
+ timeBucketToId.put(timeBucket, timeBucketId);
+
+ if (timeBucketId >= currentId) {
+ currentId = timeBucketId + 1;
+ }
+
+ textToTimeBucket.put(timeBucket.getText(), timeBucket);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "CustomTimeBucketRegistry{" + "idToTimeBucket=" + idToTimeBucket + '}';
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/org/apache/apex/malhar/lib/dimensions/DimensionsConversionContext.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dimensions/DimensionsConversionContext.java b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/DimensionsConversionContext.java
new file mode 100644
index 0000000..90d4f7b
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/DimensionsConversionContext.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dimensions;
+
+import java.io.Serializable;
+
+import com.datatorrent.lib.appdata.gpo.GPOUtils.IndexSubset;
+import com.datatorrent.lib.appdata.schemas.FieldsDescriptor;
+
+/**
+ * This is a context object used to convert {@link InputEvent}s into aggregates
+ * in {@link IncrementalAggregator}s.
+ *
+ * @since 3.3.0
+ */
+public class DimensionsConversionContext implements Serializable
+{
+ private static final long serialVersionUID = 201506151157L;
+
+ public CustomTimeBucketRegistry customTimeBucketRegistry;
+ /**
+ * The schema ID for {@link Aggregate}s emitted by the
+ * {@link org.apache.apex.malhar.lib.dimensions.aggregator.IncrementalAggregator}
+ * s holding this context.
+ */
+ public int schemaID;
+ /**
+ * The dimensionsDescriptor ID for {@link Aggregate}s emitted by the
+ * {@link org.apache.apex.malhar.lib.dimensions.aggregator.IncrementalAggregator}
+ * s holding this context.
+ */
+ public int dimensionsDescriptorID;
+ /**
+ * The aggregator ID for {@link Aggregate}s emitted by the
+ * {@link org.apache.apex.malhar.lib.dimensions.aggregator.IncrementalAggregator}
+ * s holding this context.
+ */
+ public int aggregatorID;
+ /**
+ * The {@link DimensionsDescriptor} corresponding to the given dimension
+ * descriptor id.
+ */
+ public DimensionsDescriptor dd;
+ /**
+ * The {@link FieldsDescriptor} for the aggregate of the {@link Aggregate}s
+ * emitted by the
+ * {@link org.apache.apex.malhar.lib.dimensions.aggregator.IncrementalAggregator}
+ * s holding this context object.
+ */
+ public FieldsDescriptor aggregateDescriptor;
+ /**
+ * The {@link FieldsDescriptor} for the key of the {@link Aggregate}s emitted
+ * by the
+ * {@link org.apache.apex.malhar.lib.dimensions.aggregator.IncrementalAggregator}
+ * s holding this context object.
+ */
+ public FieldsDescriptor keyDescriptor;
+ /**
+ * The index of the timestamp field within the key of {@link InputEvent}s
+ * received by the
+ * {@link org.apache.apex.malhar.lib.dimensions.aggregator.IncrementalAggregator}
+ * s holding this context object. This is -1 if the {@link InputEvent} key has
+ * no timestamp.
+ */
+ public int inputTimestampIndex;
+ /**
+ * The index of the timestamp field within the key of {@link Aggregate}s
+ * emitted by the
+ * {@link org.apache.apex.malhar.lib.dimensions.aggregator.IncrementalAggregator}
+ * s holding this context object. This is -1 if the {@link Aggregate}'s key
+ * has no timestamp.
+ */
+ public int outputTimestampIndex;
+ /**
+ * The index of the time bucket field within the key of {@link Aggregate}s
+ * emitted by the
+ * {@link org.apache.apex.malhar.lib.dimensions.aggregator.IncrementalAggregator}
+ * s holding this context object. This is -1 if the {@link Aggregate}'s key
+ * has no timebucket.
+ */
+ public int outputTimebucketIndex;
+ /**
+ * The {@link IndexSubset} object that is used to extract key values from
+ * {@link InputEvent}s received by this aggregator.
+ */
+ public IndexSubset indexSubsetKeys;
+ /**
+ * The {@link IndexSubset} object that is used to extract aggregate values
+ * from {@link InputEvent}s received by this aggregator.
+ */
+ public IndexSubset indexSubsetAggregates;
+
+ /**
+ * Constructor for creating conversion context.
+ */
+ public DimensionsConversionContext()
+ {
+ //Do nothing.
+ }
+}