You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ti...@apache.org on 2016/03/29 00:39:45 UTC
[4/4] incubator-apex-malhar git commit: APEXMALHAR-1991 #resolve
#comment Move Dimensions Computation Classes to org.apache.apex.malhar
package
APEXMALHAR-1991 #resolve #comment Move Dimensions Computation Classes to org.apache.apex.malhar package
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/c5cab8bd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/c5cab8bd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/c5cab8bd
Branch: refs/heads/master
Commit: c5cab8bd53913d76cbd0a6c37d56a9cca6968c89
Parents: 79eeff7
Author: Timothy Farkas <ti...@datatorrent.com>
Authored: Mon Mar 28 15:37:11 2016 -0700
Committer: Timothy Farkas <ti...@datatorrent.com>
Committed: Mon Mar 28 15:37:11 2016 -0700
----------------------------------------------------------------------
.../schemas/DimensionalConfigurationSchema.java | 13 +-
.../lib/appdata/schemas/DimensionalSchema.java | 5 +-
.../db/jdbc/JDBCDimensionalOutputOperator.java | 8 +-
.../dimensions/CustomTimeBucketRegistry.java | 141 ---
.../dimensions/DimensionsConversionContext.java | 116 ---
.../lib/dimensions/DimensionsDescriptor.java | 422 ---------
.../lib/dimensions/DimensionsEvent.java | 844 ------------------
.../AbstractIncrementalAggregator.java | 190 -----
.../dimensions/aggregator/AggregateEvent.java | 38 -
.../aggregator/AggregatorAverage.java | 146 ----
.../dimensions/aggregator/AggregatorCount.java | 128 ---
.../dimensions/aggregator/AggregatorCumSum.java | 233 -----
.../dimensions/aggregator/AggregatorFirst.java | 84 --
.../aggregator/AggregatorIncrementalType.java | 79 --
.../dimensions/aggregator/AggregatorLast.java | 84 --
.../dimensions/aggregator/AggregatorMax.java | 265 ------
.../dimensions/aggregator/AggregatorMin.java | 265 ------
.../aggregator/AggregatorOTFType.java | 89 --
.../aggregator/AggregatorRegistry.java | 424 ----------
.../dimensions/aggregator/AggregatorSum.java | 254 ------
.../dimensions/aggregator/AggregatorUtils.java | 148 ----
.../aggregator/IncrementalAggregator.java | 70 --
.../dimensions/aggregator/OTFAggregator.java | 84 --
.../lib/dimensions/package-info.java | 20 -
.../dimensions/CustomTimeBucketRegistry.java | 139 +++
.../dimensions/DimensionsConversionContext.java | 116 +++
.../lib/dimensions/DimensionsDescriptor.java | 447 ++++++++++
.../malhar/lib/dimensions/DimensionsEvent.java | 848 +++++++++++++++++++
.../AbstractIncrementalAggregator.java | 191 +++++
.../dimensions/aggregator/AggregateEvent.java | 38 +
.../aggregator/AggregatorAverage.java | 146 ++++
.../dimensions/aggregator/AggregatorCount.java | 129 +++
.../dimensions/aggregator/AggregatorCumSum.java | 234 +++++
.../dimensions/aggregator/AggregatorFirst.java | 85 ++
.../aggregator/AggregatorIncrementalType.java | 79 ++
.../dimensions/aggregator/AggregatorLast.java | 85 ++
.../dimensions/aggregator/AggregatorMax.java | 266 ++++++
.../dimensions/aggregator/AggregatorMin.java | 266 ++++++
.../aggregator/AggregatorOTFType.java | 89 ++
.../aggregator/AggregatorRegistry.java | 424 ++++++++++
.../dimensions/aggregator/AggregatorSum.java | 255 ++++++
.../dimensions/aggregator/AggregatorUtils.java | 148 ++++
.../aggregator/IncrementalAggregator.java | 71 ++
.../dimensions/aggregator/OTFAggregator.java | 84 ++
.../malhar/lib/dimensions/package-info.java | 20 +
.../CustomTimeBucketRegistryTest.java | 4 +-
.../appdata/dimensions/DimensionsEventTest.java | 7 +-
.../DimensionalConfigurationSchemaTest.java | 23 +-
.../appdata/schemas/DimensionalSchemaTest.java | 15 +-
.../CustomTimeBucketRegistryTest.java | 87 --
.../dimensions/DimensionsDescriptorTest.java | 101 ---
.../CustomTimeBucketRegistryTest.java | 87 ++
.../dimensions/DimensionsDescriptorTest.java | 102 +++
53 files changed, 4387 insertions(+), 4349 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/com/datatorrent/lib/appdata/schemas/DimensionalConfigurationSchema.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/appdata/schemas/DimensionalConfigurationSchema.java b/library/src/main/java/com/datatorrent/lib/appdata/schemas/DimensionalConfigurationSchema.java
index 28fa119..1e048c4 100644
--- a/library/src/main/java/com/datatorrent/lib/appdata/schemas/DimensionalConfigurationSchema.java
+++ b/library/src/main/java/com/datatorrent/lib/appdata/schemas/DimensionalConfigurationSchema.java
@@ -26,6 +26,12 @@ import java.util.Set;
import javax.validation.constraints.NotNull;
+import org.apache.apex.malhar.lib.dimensions.CustomTimeBucketRegistry;
+import org.apache.apex.malhar.lib.dimensions.DimensionsDescriptor;
+import org.apache.apex.malhar.lib.dimensions.aggregator.AggregatorRegistry;
+import org.apache.apex.malhar.lib.dimensions.aggregator.AggregatorUtils;
+import org.apache.apex.malhar.lib.dimensions.aggregator.IncrementalAggregator;
+import org.apache.apex.malhar.lib.dimensions.aggregator.OTFAggregator;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
@@ -39,13 +45,6 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
-import com.datatorrent.lib.dimensions.CustomTimeBucketRegistry;
-import com.datatorrent.lib.dimensions.DimensionsDescriptor;
-import com.datatorrent.lib.dimensions.aggregator.AggregatorRegistry;
-import com.datatorrent.lib.dimensions.aggregator.AggregatorUtils;
-import com.datatorrent.lib.dimensions.aggregator.IncrementalAggregator;
-import com.datatorrent.lib.dimensions.aggregator.OTFAggregator;
-
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import it.unimi.dsi.fastutil.ints.IntArrayList;
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/com/datatorrent/lib/appdata/schemas/DimensionalSchema.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/appdata/schemas/DimensionalSchema.java b/library/src/main/java/com/datatorrent/lib/appdata/schemas/DimensionalSchema.java
index 6639d3a..30f2c1e 100644
--- a/library/src/main/java/com/datatorrent/lib/appdata/schemas/DimensionalSchema.java
+++ b/library/src/main/java/com/datatorrent/lib/appdata/schemas/DimensionalSchema.java
@@ -23,6 +23,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.apex.malhar.lib.dimensions.aggregator.AggregatorRegistry;
+import org.apache.apex.malhar.lib.dimensions.aggregator.IncrementalAggregator;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
@@ -35,9 +37,6 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
-import com.datatorrent.lib.dimensions.aggregator.AggregatorRegistry;
-import com.datatorrent.lib.dimensions.aggregator.IncrementalAggregator;
-
/**
* The {@link DimensionalSchema} class represents the App Data dimensions schema. The App Data dimensions
* schema is built from two sources: a {@link DimensionalConfigurationSchema} and an optional schema stub. The
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/com/datatorrent/lib/db/jdbc/JDBCDimensionalOutputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/JDBCDimensionalOutputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/JDBCDimensionalOutputOperator.java
index 3021521..353f1b2 100644
--- a/library/src/main/java/com/datatorrent/lib/db/jdbc/JDBCDimensionalOutputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/JDBCDimensionalOutputOperator.java
@@ -28,6 +28,10 @@ import java.util.Map;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
+import org.apache.apex.malhar.lib.dimensions.DimensionsDescriptor;
+import org.apache.apex.malhar.lib.dimensions.DimensionsEvent.Aggregate;
+import org.apache.apex.malhar.lib.dimensions.DimensionsEvent.EventKey;
+import org.apache.apex.malhar.lib.dimensions.aggregator.AggregatorRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,10 +46,6 @@ import com.datatorrent.lib.appdata.schemas.DimensionalConfigurationSchema;
import com.datatorrent.lib.appdata.schemas.FieldsDescriptor;
import com.datatorrent.lib.appdata.schemas.Type;
import com.datatorrent.lib.db.AbstractPassThruTransactionableStoreOutputOperator;
-import com.datatorrent.lib.dimensions.DimensionsDescriptor;
-import com.datatorrent.lib.dimensions.DimensionsEvent.Aggregate;
-import com.datatorrent.lib.dimensions.DimensionsEvent.EventKey;
-import com.datatorrent.lib.dimensions.aggregator.AggregatorRegistry;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/com/datatorrent/lib/dimensions/CustomTimeBucketRegistry.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/dimensions/CustomTimeBucketRegistry.java b/library/src/main/java/com/datatorrent/lib/dimensions/CustomTimeBucketRegistry.java
deleted file mode 100644
index 0e76509..0000000
--- a/library/src/main/java/com/datatorrent/lib/dimensions/CustomTimeBucketRegistry.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.dimensions;
-
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.Map;
-
-import com.google.common.base.Preconditions;
-
-import com.datatorrent.lib.appdata.schemas.CustomTimeBucket;
-
-import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
-import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
-import it.unimi.dsi.fastutil.objects.Object2IntMap;
-import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
-
-/**
- * @since 3.3.0
- */
-public class CustomTimeBucketRegistry implements Serializable
-{
- private static final long serialVersionUID = 201509221536L;
-
- private int currentId;
-
- private Int2ObjectMap<CustomTimeBucket> idToTimeBucket = new Int2ObjectOpenHashMap<>();
- private Object2IntMap<CustomTimeBucket> timeBucketToId = new Object2IntOpenHashMap<>();
- private Map<String, CustomTimeBucket> textToTimeBucket = new HashMap<>();
-
- public CustomTimeBucketRegistry()
- {
- }
-
- public CustomTimeBucketRegistry(int startingId)
- {
- this.currentId = startingId;
- }
-
- public CustomTimeBucketRegistry(Int2ObjectMap<CustomTimeBucket> idToTimeBucket)
- {
- initialize(idToTimeBucket);
- }
-
- public CustomTimeBucketRegistry(Int2ObjectMap<CustomTimeBucket> idToTimeBucket,
- int startingId)
- {
- int tempId = initialize(idToTimeBucket);
-
- Preconditions.checkArgument(tempId < startingId, "The statingId " + startingId
- + " must be larger than the largest ID " + tempId
- + " in the given idToTimeBucket mapping");
-
- this.idToTimeBucket = Preconditions.checkNotNull(idToTimeBucket);
- this.currentId = startingId;
- }
-
- private int initialize(Int2ObjectMap<CustomTimeBucket> idToTimeBucket)
- {
- Preconditions.checkNotNull(idToTimeBucket);
-
- int tempId = Integer.MIN_VALUE;
-
- for (int timeBucketId : idToTimeBucket.keySet()) {
- tempId = Math.max(tempId, timeBucketId);
- CustomTimeBucket customTimeBucket = idToTimeBucket.get(timeBucketId);
- textToTimeBucket.put(customTimeBucket.getText(), customTimeBucket);
- Preconditions.checkNotNull(customTimeBucket);
- timeBucketToId.put(customTimeBucket, timeBucketId);
- }
-
- return tempId;
- }
-
- public CustomTimeBucket getTimeBucket(int timeBucketId)
- {
- return idToTimeBucket.get(timeBucketId);
- }
-
- public Integer getTimeBucketId(CustomTimeBucket timeBucket)
- {
- if (!timeBucketToId.containsKey(timeBucket)) {
- return null;
- }
-
- return timeBucketToId.get(timeBucket);
- }
-
- public CustomTimeBucket getTimeBucket(String text)
- {
- return textToTimeBucket.get(text);
- }
-
- public void register(CustomTimeBucket timeBucket)
- {
- register(timeBucket, currentId);
- }
-
- public void register(CustomTimeBucket timeBucket, int timeBucketId)
- {
- if (timeBucketToId.containsKey(timeBucket)) {
- throw new IllegalArgumentException("The timeBucket " + timeBucket + " is already registered.");
- }
-
- if (timeBucketToId.containsValue(timeBucketId)) {
- throw new IllegalArgumentException("The timeBucketId " + timeBucketId + " is already registered.");
- }
-
- idToTimeBucket.put(timeBucketId, timeBucket);
- timeBucketToId.put(timeBucket, timeBucketId);
-
- if (timeBucketId >= currentId) {
- currentId = timeBucketId + 1;
- }
-
- textToTimeBucket.put(timeBucket.getText(), timeBucket);
- }
-
- @Override
- public String toString()
- {
- return "CustomTimeBucketRegistry{" + "idToTimeBucket=" + idToTimeBucket + '}';
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/com/datatorrent/lib/dimensions/DimensionsConversionContext.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/dimensions/DimensionsConversionContext.java b/library/src/main/java/com/datatorrent/lib/dimensions/DimensionsConversionContext.java
deleted file mode 100644
index dd598ff..0000000
--- a/library/src/main/java/com/datatorrent/lib/dimensions/DimensionsConversionContext.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.dimensions;
-
-import java.io.Serializable;
-
-import com.datatorrent.lib.appdata.gpo.GPOUtils.IndexSubset;
-import com.datatorrent.lib.appdata.schemas.FieldsDescriptor;
-
-/**
- * This is a context object used to convert {@link InputEvent}s into aggregates
- * in {@link IncrementalAggregator}s.
- *
- * @since 3.3.0
- */
-public class DimensionsConversionContext implements Serializable
-{
- private static final long serialVersionUID = 201506151157L;
-
- public CustomTimeBucketRegistry customTimeBucketRegistry;
- /**
- * The schema ID for {@link Aggregate}s emitted by the
- * {@link com.datatorrent.lib.dimensions.aggregator.IncrementalAggregator}s
- * holding this context.
- */
- public int schemaID;
- /**
- * The dimensionsDescriptor ID for {@link Aggregate}s emitted by the
- * {@link com.datatorrent.lib.dimensions.aggregator.IncrementalAggregator}s
- * holding this context.
- */
- public int dimensionsDescriptorID;
- /**
- * The aggregator ID for {@link Aggregate}s emitted by the
- * {@link com.datatorrent.lib.dimensions.aggregator.IncrementalAggregator}s
- * holding this context.
- */
- public int aggregatorID;
- /**
- * The {@link DimensionsDescriptor} corresponding to the given dimension
- * descriptor id.
- */
- public DimensionsDescriptor dd;
- /**
- * The {@link FieldsDescriptor} for the aggregate of the {@link Aggregate}s
- * emitted by the
- * {@link com.datatorrent.lib.dimensions.aggregator.IncrementalAggregator}s
- * holding this context object.
- */
- public FieldsDescriptor aggregateDescriptor;
- /**
- * The {@link FieldsDescriptor} for the key of the {@link Aggregate}s emitted
- * by the
- * {@link com.datatorrent.lib.dimensions.aggregator.IncrementalAggregator}s
- * holding this context object.
- */
- public FieldsDescriptor keyDescriptor;
- /**
- * The index of the timestamp field within the key of {@link InputEvent}s
- * received by the
- * {@link com.datatorrent.lib.dimensions.aggregator.IncrementalAggregator}s
- * holding this context object. This is -1 if the {@link InputEvent} key has
- * no timestamp.
- */
- public int inputTimestampIndex;
- /**
- * The index of the timestamp field within the key of {@link Aggregate}s
- * emitted by the
- * {@link com.datatorrent.lib.dimensions.aggregator.IncrementalAggregator}s
- * holding this context object. This is -1 if the {@link Aggregate}'s key has
- * no timestamp.
- */
- public int outputTimestampIndex;
- /**
- * The index of the time bucket field within the key of {@link Aggregate}s
- * emitted by the
- * {@link com.datatorrent.lib.dimensions.aggregator.IncrementalAggregator}s
- * holding this context object. This is -1 if the {@link Aggregate}'s key has
- * no timebucket.
- */
- public int outputTimebucketIndex;
- /**
- * The {@link IndexSubset} object that is used to extract key values from
- * {@link InputEvent}s received by this aggregator.
- */
- public IndexSubset indexSubsetKeys;
- /**
- * The {@link IndexSubset} object that is used to extract aggregate values
- * from {@link InputEvent}s received by this aggregator.
- */
- public IndexSubset indexSubsetAggregates;
-
- /**
- * Constructor for creating conversion context.
- */
- public DimensionsConversionContext()
- {
- //Do nothing.
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/com/datatorrent/lib/dimensions/DimensionsDescriptor.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/dimensions/DimensionsDescriptor.java b/library/src/main/java/com/datatorrent/lib/dimensions/DimensionsDescriptor.java
deleted file mode 100644
index a1b6f96..0000000
--- a/library/src/main/java/com/datatorrent/lib/dimensions/DimensionsDescriptor.java
+++ /dev/null
@@ -1,422 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.dimensions;
-
-import java.io.Serializable;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
-import com.datatorrent.lib.appdata.schemas.CustomTimeBucket;
-import com.datatorrent.lib.appdata.schemas.Fields;
-import com.datatorrent.lib.appdata.schemas.FieldsDescriptor;
-import com.datatorrent.lib.appdata.schemas.TimeBucket;
-import com.datatorrent.lib.appdata.schemas.Type;
-
-/**
- * <p>
- * This class defines a dimensions combination which is used by dimensions computation operators
- * and stores. A dimension combination is composed of the names of the fields that constitute the key,
- * as well as the TimeBucket under which data is stored.
- * </p>
- * <p>
- * This class supports the creation of a dimensions combination from a {@link TimeBucket} object and a set of fields.
- * It also supports the creation of a dimensions combination an aggregation string. An aggregation string looks like
- * the following:
- * <br/>
- * <br/>
- * {@code
- * "time=MINUTES:publisher:advertiser"
- * }
- * <br/>
- * <br/>
- * In the example above <b>"time=MINUTES"</b> represents a time bucket, and the other colon separated strings represent
- * the name of fields which comprise the key for this dimension combination. When specifiying a time bucket in an
- * aggregation string you must use the name of one of the TimeUnit enums.
- * </p>
- * <p>
- * One of the primary uses of a {@link DimensionsDescriptor} is for querying a dimensional data store. When a query is
- * received for a dimensional data store, the query must be mapped to many things including a dimensionDescriptorID. The
- * dimensionDescriptorID is an id assigned to a class of dimension combinations which share the same keys. This
- * mapping is
- * performed by creating a
- * {@link DimensionsDescriptor} object from the query, and then using the {@link DimensionsDescriptor} object
- * to look up the correct dimensionsDescriptorID. This lookup to retrieve a dimensionsDescriptorID is necessary
- * because a
- * dimensionsDescriptorID is used for storage in order to prevent key conflicts.
- * </p>
- *
- *
- * @since 3.3.0
- */
-public class DimensionsDescriptor implements Serializable, Comparable<DimensionsDescriptor>
-{
- private static final long serialVersionUID = 201506251237L;
-
- /**
- * Name of the reserved time field.
- */
- public static final String DIMENSION_TIME = "time";
- /**
- * Type of the reserved time field.
- */
- public static final Type DIMENSION_TIME_TYPE = Type.LONG;
- /**
- * Name of the reserved time bucket field.
- */
- public static final String DIMENSION_TIME_BUCKET = "timeBucket";
- /**
- * Type of the reserved time bucket field.
- */
- public static final Type DIMENSION_TIME_BUCKET_TYPE = Type.INTEGER;
- /**
- * The set of fields used for time, which are intended to be queried. Not that the
- * timeBucket field is not included here because its not intended to be queried.
- */
- public static final Fields TIME_FIELDS = new Fields(Sets.newHashSet(DIMENSION_TIME));
- /**
- * This set represents the field names which cannot be part of the user defined field names in a schema for
- * dimensions computation.
- */
- public static final Set<String> RESERVED_DIMENSION_NAMES = ImmutableSet.of(DIMENSION_TIME,
- DIMENSION_TIME_BUCKET);
- /**
- * This is the equals string separator used when defining a time bucket for a dimensions combination.
- */
- public static final String DELIMETER_EQUALS = "=";
- /**
- * This separates dimensions in the dimensions combination.
- */
- public static final String DELIMETER_SEPERATOR = ":";
- /**
- * A map from a key field to its type.
- */
- public static final Map<String, Type> DIMENSION_FIELD_TO_TYPE;
-
- /**
- * The time bucket used for this dimension combination.
- */
- private TimeBucket timeBucket;
- /**
- * The custom time bucket used for this dimension combination.
- */
- private CustomTimeBucket customTimeBucket;
- /**
- * The set of key fields which compose this dimension combination.
- */
- private Fields fields;
-
- static {
- Map<String, Type> dimensionFieldToType = Maps.newHashMap();
-
- dimensionFieldToType.put(DIMENSION_TIME, DIMENSION_TIME_TYPE);
- dimensionFieldToType.put(DIMENSION_TIME_BUCKET, DIMENSION_TIME_BUCKET_TYPE);
-
- DIMENSION_FIELD_TO_TYPE = Collections.unmodifiableMap(dimensionFieldToType);
- }
-
- /**
- * Constructor for kryo serialization.
- */
- private DimensionsDescriptor()
- {
- //for kryo
- }
-
- /**
- * Creates a dimensions descriptor (dimensions combination) with the given {@link TimeBucket} and key fields.
- *
- * @param timeBucket The {@link TimeBucket} that this dimensions combination represents.
- * @param fields The key fields included in this dimensions combination.
- * @deprecated use
- * {@link #DimensionsDescriptor(com.datatorrent.lib.appdata.schemas.CustomTimeBucket,
- * com.datatorrent.lib.appdata.schemas.Fields)} instead.
- */
- @Deprecated
- public DimensionsDescriptor(TimeBucket timeBucket,
- Fields fields)
- {
- setTimeBucket(timeBucket);
- setFields(fields);
- }
-
- /**
- * Creates a dimensions descriptor (dimensions combination) with the given {@link CustomTimeBucket} and key fields.
- *
- * @param timeBucket The {@link CustomTimeBucket} that this dimensions combination represents.
- * @param fields The key fields included in this dimensions combination.
- */
- public DimensionsDescriptor(CustomTimeBucket timeBucket,
- Fields fields)
- {
- setCustomTimeBucket(timeBucket);
- setFields(fields);
- }
-
- /**
- * Creates a dimensions descriptor (dimensions combination) with the given key fields.
- *
- * @param fields The key fields included in this dimensions combination.
- */
- public DimensionsDescriptor(Fields fields)
- {
- setFields(fields);
- }
-
- /**
- * This construction creates a dimensions descriptor (dimensions combination) from the given aggregation string.
- *
- * @param aggregationString The aggregation string to use when initializing this dimensions combination.
- */
- public DimensionsDescriptor(String aggregationString)
- {
- initialize(aggregationString);
- }
-
- /**
- * Initializes the dimensions combination with the given aggregation string.
- *
- * @param aggregationString The aggregation string with which to initialize this dimensions combination.
- */
- private void initialize(String aggregationString)
- {
- String[] fieldArray = aggregationString.split(DELIMETER_SEPERATOR);
- Set<String> fieldSet = Sets.newHashSet();
-
- for (String field : fieldArray) {
- String[] fieldAndValue = field.split(DELIMETER_EQUALS);
- String fieldName = fieldAndValue[0];
-
- if (fieldName.equals(DIMENSION_TIME_BUCKET)) {
- throw new IllegalArgumentException(DIMENSION_TIME_BUCKET + " is an invalid time.");
- }
-
- if (!fieldName.equals(DIMENSION_TIME)) {
- fieldSet.add(fieldName);
- }
-
- if (fieldName.equals(DIMENSION_TIME)) {
- if (timeBucket != null) {
- throw new IllegalArgumentException("Cannot specify time in a dimensions "
- + "descriptor when a timebucket is also "
- + "specified.");
- }
-
- if (fieldAndValue.length == 2) {
-
- timeBucket = TimeBucket.TIME_UNIT_TO_TIME_BUCKET.get(TimeUnit.valueOf(fieldAndValue[1]));
- }
- }
- }
-
- fields = new Fields(fieldSet);
- }
-
- /**
- * This is a helper method which sets and validates the {@link TimeBucket}.
- *
- * @param timeBucket The {@link TimeBucket} to set and validate.
- */
- private void setTimeBucket(TimeBucket timeBucket)
- {
- Preconditions.checkNotNull(timeBucket);
- this.timeBucket = timeBucket;
- this.customTimeBucket = new CustomTimeBucket(timeBucket);
- }
-
- /**
- * This is a helper method which sets and validates the {@link CustomTimeBucket}.
- *
- * @param customTimeBucket The {@link CustomTimeBucket} to set and validate.
- */
- private void setCustomTimeBucket(CustomTimeBucket customTimeBucket)
- {
- Preconditions.checkNotNull(customTimeBucket);
- this.customTimeBucket = customTimeBucket;
- this.timeBucket = customTimeBucket.getTimeBucket();
- }
-
- /**
- * Gets the {@link TimeBucket} for this {@link DimensionsDescriptor} object.
- *
- * @return The {@link TimeBucket} for this {@link DimensionsDescriptor} object.
- * @deprecated use {@link #getCustomTimeBucket()} instead.
- */
- @Deprecated
- public TimeBucket getTimeBucket()
- {
- return timeBucket;
- }
-
- /**
- * Gets the {@link CustomTimeBucket} for this {@link DimensionsDescriptor} object.
- *
- * @return The {@link CustomTimeBucket} for this {@link DimensionsDescriptor} object.
- */
- public CustomTimeBucket getCustomTimeBucket()
- {
- return customTimeBucket;
- }
-
- /**
- * This is a helper method which sets and validates the set of key fields for this
- * {@link DimensionsDescriptor} object.
- *
- * @param fields The set of key fields for this {@link DimensionsDescriptor} object.
- */
- private void setFields(Fields fields)
- {
- Preconditions.checkNotNull(fields);
- this.fields = fields;
- }
-
- /**
- * Returns the set of key fields for this {@link DimensionsDescriptor} object.
- *
- * @return The set of key fields for this {@link DimensionsDescriptor} object.
- */
- public Fields getFields()
- {
- return fields;
- }
-
- /**
- * This method is used to create a new {@link FieldsDescriptor} object representing this
- * {@link DimensionsDescriptor} object from another {@link FieldsDescriptor} object which
- * defines the names and types of all the available key fields.
- *
- * @param parentDescriptor The {@link FieldsDescriptor} object which defines the name and
- * type of all the available key fields.
- * @return A {@link FieldsDescriptor} object which represents this {@link DimensionsDescriptor} (dimensions
- * combination)
- * derived from the given {@link FieldsDescriptor} object.
- */
- public FieldsDescriptor createFieldsDescriptor(FieldsDescriptor parentDescriptor)
- {
- Map<String, Type> fieldToType = Maps.newHashMap();
- Map<String, Type> parentFieldToType = parentDescriptor.getFieldToType();
-
- for (String field : this.fields.getFields()) {
- if (RESERVED_DIMENSION_NAMES.contains(field)) {
- continue;
- }
-
- fieldToType.put(field, parentFieldToType.get(field));
- }
-
- if (timeBucket != null && timeBucket != TimeBucket.ALL) {
- fieldToType.put(DIMENSION_TIME_BUCKET, DIMENSION_TIME_BUCKET_TYPE);
- fieldToType.put(DIMENSION_TIME, Type.LONG);
- }
-
- return new FieldsDescriptor(fieldToType);
- }
-
- @Override
- public int hashCode()
- {
- int hash = 7;
- hash = 83 * hash + (this.customTimeBucket != null ? this.customTimeBucket.hashCode() : 0);
- hash = 83 * hash + (this.fields != null ? this.fields.hashCode() : 0);
- return hash;
- }
-
- @Override
- public boolean equals(Object obj)
- {
- if (obj == null) {
- return false;
- }
- if (getClass() != obj.getClass()) {
- return false;
- }
- final DimensionsDescriptor other = (DimensionsDescriptor)obj;
- if (!this.customTimeBucket.equals(other.customTimeBucket)) {
- return false;
- }
- if (this.fields != other.fields && (this.fields == null || !this.fields.equals(other.fields))) {
- return false;
- }
- return true;
- }
-
- @Override
- public String toString()
- {
- return "DimensionsDescriptor{" + "timeBucket=" + customTimeBucket + ", fields=" + fields + '}';
- }
-
- @Override
- public int compareTo(DimensionsDescriptor other)
- {
- if (this == other) {
- return 0;
- }
-
- List<String> thisFieldList = this.getFields().getFieldsList();
- List<String> otherFieldList = other.getFields().getFieldsList();
-
- if (thisFieldList != otherFieldList) {
- int compare = thisFieldList.size() - otherFieldList.size();
-
- if (compare != 0) {
- return compare;
- }
-
- Collections.sort(thisFieldList);
- Collections.sort(otherFieldList);
-
- for (int index = 0; index < thisFieldList.size(); index++) {
- String thisField = thisFieldList.get(index);
- String otherField = otherFieldList.get(index);
-
- int fieldCompare = thisField.compareTo(otherField);
-
- if (fieldCompare != 0) {
- return fieldCompare;
- }
- }
- }
-
- CustomTimeBucket thisBucket = this.getCustomTimeBucket();
- CustomTimeBucket otherBucket = other.getCustomTimeBucket();
-
- if (thisBucket == null && otherBucket == null) {
- return 0;
- } else if (thisBucket != null && otherBucket == null) {
- return 1;
- } else if (thisBucket == null && otherBucket != null) {
- return -1;
- } else {
- return thisBucket.compareTo(otherBucket);
- }
- }
-
- private static final Logger LOG = LoggerFactory.getLogger(DimensionsDescriptor.class);
-}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/com/datatorrent/lib/dimensions/DimensionsEvent.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/dimensions/DimensionsEvent.java b/library/src/main/java/com/datatorrent/lib/dimensions/DimensionsEvent.java
deleted file mode 100644
index b12b631..0000000
--- a/library/src/main/java/com/datatorrent/lib/dimensions/DimensionsEvent.java
+++ /dev/null
@@ -1,844 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.dimensions;
-
-import java.io.Serializable;
-import java.util.List;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-import com.datatorrent.lib.appdata.gpo.GPOMutable;
-import com.datatorrent.lib.appdata.gpo.GPOUtils;
-import com.datatorrent.lib.dimensions.aggregator.AggregateEvent;
-
-/**
- * <p>
- * This is the base class for the events that are used for internal processing in the subclasses of
- * {@link AbstractDimensionsComputationFlexible} and {@link DimensionsStoreHDHT}.
- * </p>
- * <p>
- * A {@link DimensionsEvent} is constructed from two parts: an {@link EventKey} and a {@link GPOMutable} object
- * which contains the values of aggregate fields. The {@link EventKey} is used to identify the dimension combination
- * an event belongs to, and consequently determines what input values should be aggregated together. The aggregates
- * are the actual data payload of the event which are to be aggregated.
- * </p>
- *
- * @since 3.1.0
- */
-public class DimensionsEvent implements Serializable
-{
- private static final long serialVersionUID = 201503231204L;
-
- /**
- * This is the {@link GPOMutable} object which holds all the aggregates.
- */
- protected GPOMutable aggregates;
- /**
- * This is the event key for the event.
- */
- protected EventKey eventKey;
-
- /**
- * Constructor for Kryo.
- */
- private DimensionsEvent()
- {
- //For kryo
- }
-
- /**
- * This creates a {@link DimensionsEvent} from the given event key and aggregates.
- *
- * @param eventKey The key from which to create a {@link DimensionsEvent}.
- * @param aggregates The aggregates from which to create {@link DimensionsEvent}.
- */
- public DimensionsEvent(EventKey eventKey,
- GPOMutable aggregates)
- {
- setEventKey(eventKey);
- setAggregates(aggregates);
- }
-
- /**
- * Creates a DimensionsEvent with the given key values, aggregates and ids.
- *
- * @param keys The values for fields in the key.
- * @param aggregates The values for fields in the aggregate.
- * @param bucketID The bucketID
- * @param schemaID The schemaID.
- * @param dimensionDescriptorID The dimensionsDescriptorID.
- * @param aggregatorIndex The aggregatorIndex assigned to this event by the unifier.
- */
- public DimensionsEvent(GPOMutable keys,
- GPOMutable aggregates,
- int bucketID,
- int schemaID,
- int dimensionDescriptorID,
- int aggregatorIndex)
- {
- this.eventKey = new EventKey(bucketID,
- schemaID,
- dimensionDescriptorID,
- aggregatorIndex,
- keys);
- setAggregates(aggregates);
- }
-
- /**
- * This creates an event with the given data. Note, this constructor assumes that the bucketID will be 0.
- *
- * @param keys The value for fields in the key.
- * @param aggregates The value for fields in the aggregate.
- * @param schemaID The schemaID.
- * @param dimensionDescriptorID The dimensionsDescriptorID.
- * @param aggregatorIndex The aggregatorIndex assigned to this event by the unifier.
- */
- public DimensionsEvent(GPOMutable keys,
- GPOMutable aggregates,
- int schemaID,
- int dimensionDescriptorID,
- int aggregatorIndex)
- {
- this.eventKey = new EventKey(schemaID,
- dimensionDescriptorID,
- aggregatorIndex,
- keys);
- setAggregates(aggregates);
- }
-
- /**
- * This is a helper method which sets the {@link EventKey} of the event to
- * be the same as the given {@link EventKey}.
- *
- * @param eventKey The {@link EventKey} to set on this event.
- */
- protected final void setEventKey(EventKey eventKey)
- {
- this.eventKey = new EventKey(eventKey);
- }
-
- /**
- * This is a helper method which sets the aggregates for this event.
- *
- * @param aggregates The aggregates for this event.
- */
- protected final void setAggregates(GPOMutable aggregates)
- {
- Preconditions.checkNotNull(aggregates);
- this.aggregates = aggregates;
- }
-
- /**
- * This is a helper method which returns the aggregates for this event.
- *
- * @return The helper method which returns the aggregates for this event.
- */
- public GPOMutable getAggregates()
- {
- return aggregates;
- }
-
- /**
- * Returns the {@link EventKey} for this event.
- *
- * @return The {@link EventKey} for this event.
- */
- public EventKey getEventKey()
- {
- return eventKey;
- }
-
- /**
- * This is a convenience method which returns the values of the key fields in this event's
- * {@link EventKey}.
- *
- * @return The values of the key fields in this event's {@link EventKey}.
- */
- public GPOMutable getKeys()
- {
- return eventKey.getKey();
- }
-
- /**
- * This is a convenience method which returns the schemaID of this event's {@link EventKey}.
- *
- * @return The schemaID of this event's {@link EventKey}.
- */
- public int getSchemaID()
- {
- return eventKey.getSchemaID();
- }
-
- /**
- * Returns the id of the dimension descriptor (key combination) for which this event contains data.
- *
- * @return The id of the dimension descriptor (key combination) for which this event contains data.
- */
- public int getDimensionDescriptorID()
- {
- return eventKey.getDimensionDescriptorID();
- }
-
- /**
- * Returns the id of the aggregator which is applied to this event's data.
- *
- * @return Returns the id of the aggregator which is applied to this event's data.
- */
- public int getAggregatorID()
- {
- return eventKey.getAggregatorID();
- }
-
- /**
- * Returns the bucketID assigned to this event. The bucketID is useful for this event in the case that the event
- * is sent to a partitioned HDHT operator. Each partitioned HDHT operator can use the bucketIDs for the buckets it
- * writes to as a partition key.
- *
- * @return The bucketID assigned to this event.
- */
- public int getBucketID()
- {
- return eventKey.getBucketID();
- }
-
- /**
- * This is a utility method which copies the given src event to the given destination event.
- *
- * @param aeDest The destination event.
- * @param aeSrc The source event.
- */
- public static void copy(DimensionsEvent aeDest, DimensionsEvent aeSrc)
- {
- GPOMutable destAggs = aeDest.getAggregates();
- GPOMutable srcAggs = aeSrc.getAggregates();
-
- if (srcAggs.getFieldsBoolean() != null) {
- System.arraycopy(srcAggs.getFieldsBoolean(), 0, destAggs.getFieldsBoolean(), 0,
- srcAggs.getFieldsBoolean().length);
- }
-
- if (srcAggs.getFieldsCharacter() != null) {
- System.arraycopy(srcAggs.getFieldsCharacter(), 0, destAggs.getFieldsCharacter(), 0,
- srcAggs.getFieldsCharacter().length);
- }
-
- if (srcAggs.getFieldsString() != null) {
- System.arraycopy(srcAggs.getFieldsString(), 0, destAggs.getFieldsString(), 0, srcAggs.getFieldsString().length);
- }
-
- if (srcAggs.getFieldsShort() != null) {
- System.arraycopy(srcAggs.getFieldsShort(), 0, destAggs.getFieldsShort(), 0, srcAggs.getFieldsShort().length);
- }
-
- if (srcAggs.getFieldsInteger() != null) {
- System.arraycopy(srcAggs.getFieldsInteger(), 0, destAggs.getFieldsInteger(), 0,
- srcAggs.getFieldsInteger().length);
- }
-
- if (srcAggs.getFieldsLong() != null) {
- System.arraycopy(srcAggs.getFieldsLong(), 0, destAggs.getFieldsLong(), 0, srcAggs.getFieldsLong().length);
- }
-
- if (srcAggs.getFieldsFloat() != null) {
- System.arraycopy(srcAggs.getFieldsFloat(), 0, destAggs.getFieldsFloat(), 0, srcAggs.getFieldsFloat().length);
- }
-
- if (srcAggs.getFieldsDouble() != null) {
- System.arraycopy(srcAggs.getFieldsDouble(), 0, destAggs.getFieldsDouble(), 0, srcAggs.getFieldsDouble().length);
- }
- }
-
- /**
- * <p>
- * The {@link EventKey} represents a dimensions combination for a dimensions event. It contains the keys and values
- * which define a dimensions combination. It's very similar to a {@link DimensionsDescriptor} which is also used to
- * define part of a dimensions combination. The difference between the two is that a {@link DimensionsDescriptor}
- * only contains what
- * keys are included in the combination, not the values of those keys (which the {@link EventKey} has.
- * </p>
- * <p>
- * In addition to holding the keys in a dimensions combination and their values, the event key holds some meta
- * information.
- * The meta information included and their purposes are the following:
- * <ul>
- * <li><b>bucketID:</b> This is set when the dimension store responsible for storing the data is partitioned. In that
- * case the bucketID is used as the partitionID.</li>
- * <li><b>schemaID:</b> This is the id of the {@link DimensionalSchema} that this {@link EventKey} corresponds to
- * .</li>
- * <li><b>dimensionDescriptorID:</b> This is the id of the {@link DimensionsDescriptor} that this {@link EventKey}
- * corresponds to.</li>
- * <li><b>aggregatorID:</b> This is the id of the aggregator that is used to aggregate the values associated with
- * this {@link EventKey}
- * in a {@link DimensionsEvent}.</li>
- * </ul>
- * </p>
- */
- public static class EventKey implements Serializable
- {
- private static final long serialVersionUID = 201503231205L;
-
- /**
- * The bucketID assigned to this event key.
- */
- private int bucketID;
- /**
- * The schemaID corresponding to the {@link DimensionalSchema} that this {@link EventKey}
- * corresponds to.
- */
- private int schemaID;
- /**
- * The dimensionsDescriptorID of the {@link DimensionDescriptor} in the corresponding {@link DimensionalSchema}.
- */
- private int dimensionDescriptorID;
- /**
- * The id of the aggregator which should be used to aggregate the values corresponding to this
- * {@link EventKey}.
- */
- private int aggregatorID;
- /**
- * The values of the key fields.
- */
- private GPOMutable key;
-
- /**
- * Constructor for serialization.
- */
- private EventKey()
- {
- //For kryo
- }
-
- /**
- * Copy constructor.
- *
- * @param eventKey The {@link EventKey} whose data will be copied.
- */
- public EventKey(EventKey eventKey)
- {
- this.bucketID = eventKey.bucketID;
- this.schemaID = eventKey.schemaID;
- this.dimensionDescriptorID = eventKey.dimensionDescriptorID;
- this.aggregatorID = eventKey.aggregatorID;
-
- this.key = new GPOMutable(eventKey.getKey());
- }
-
- /**
- * Creates an event key with the given data.
- *
- * @param bucketID The bucketID assigned to this {@link EventKey}.
- * @param schemaID The schemaID of the corresponding {@link DimensionalSchema}.
- * @param dimensionDescriptorID The dimensionDescriptorID of the corresponding
- * {@link DimensionDescriptor} in the {@link DimensionalSchema}.
- * @param aggregatorID The id of the aggregator which should be used to aggregate the values
- * corresponding to this
- * {@link EventKey}.
- * @param key The values of the keys.
- */
- public EventKey(int bucketID,
- int schemaID,
- int dimensionDescriptorID,
- int aggregatorID,
- GPOMutable key)
- {
- setBucketID(bucketID);
- setSchemaID(schemaID);
- setDimensionDescriptorID(dimensionDescriptorID);
- setAggregatorID(aggregatorID);
- setKey(key);
- }
-
- /**
- * Creates an event key with the given data. This constructor assumes that the bucketID will be 0.
- *
- * @param schemaID The schemaID of the corresponding {@link DimensionalSchema}.
- * @param dimensionDescriptorID The dimensionDescriptorID of the corresponding {@link DimensionDescriptor}.
- * @param aggregatorID The id of the aggregator which should be used to aggregate the values
- * corresponding to this
- * {@link EventKey}.
- * @param key The values of the keys.
- */
- public EventKey(int schemaID,
- int dimensionDescriptorID,
- int aggregatorID,
- GPOMutable key)
- {
- setSchemaID(schemaID);
- setDimensionDescriptorID(dimensionDescriptorID);
- setAggregatorID(aggregatorID);
- setKey(key);
- }
-
- /**
- * Sets the dimension descriptor ID.
- *
- * @param dimensionDescriptorID The dimension descriptor ID to set.
- */
- private void setDimensionDescriptorID(int dimensionDescriptorID)
- {
- this.dimensionDescriptorID = dimensionDescriptorID;
- }
-
- /**
- * Returns the dimension descriptor ID.
- *
- * @return The dimension descriptor ID.
- */
- public int getDimensionDescriptorID()
- {
- return dimensionDescriptorID;
- }
-
- /**
- * Returns the aggregatorID.
- *
- * @return The aggregatorID.
- */
- public int getAggregatorID()
- {
- return aggregatorID;
- }
-
- /**
- * Sets the aggregatorID.
- *
- * @param aggregatorID The aggregatorID to set.
- */
- private void setAggregatorID(int aggregatorID)
- {
- this.aggregatorID = aggregatorID;
- }
-
- /**
- * Returns the schemaID.
- *
- * @return The schemaID to set.
- */
- public int getSchemaID()
- {
- return schemaID;
- }
-
- /**
- * Sets the schemaID.
- *
- * @param schemaID The schemaID to set.
- */
- private void setSchemaID(int schemaID)
- {
- this.schemaID = schemaID;
- }
-
- /**
- * Returns the key values.
- *
- * @return The key values.
- */
- public GPOMutable getKey()
- {
- return key;
- }
-
- /**
- * Sets the bucektID.
- *
- * @param bucketID The bucketID.
- */
- private void setBucketID(int bucketID)
- {
- this.bucketID = bucketID;
- }
-
- /**
- * Gets the bucketID.
- *
- * @return The bucketID.
- */
- public int getBucketID()
- {
- return bucketID;
- }
-
- /**
- * Sets the key values.
- *
- * @param key The key values to set.
- */
- private void setKey(GPOMutable key)
- {
- Preconditions.checkNotNull(key);
- this.key = key;
- }
-
- @Override
- public int hashCode()
- {
- int hash = 3;
- hash = 97 * hash + this.bucketID;
- hash = 97 * hash + this.schemaID;
- hash = 97 * hash + this.dimensionDescriptorID;
- hash = 97 * hash + this.aggregatorID;
- hash = 97 * hash + (this.key != null ? this.key.hashCode() : 0);
- return hash;
- }
-
- @Override
- public boolean equals(Object obj)
- {
- if (obj == null) {
- return false;
- }
-
- if (getClass() != obj.getClass()) {
- return false;
- }
-
- final EventKey other = (EventKey)obj;
-
- if (this.bucketID != other.bucketID) {
- return false;
- }
-
- if (this.schemaID != other.schemaID) {
- return false;
- }
-
- if (this.dimensionDescriptorID != other.dimensionDescriptorID) {
- return false;
- }
-
- if (this.aggregatorID != other.aggregatorID) {
- return false;
- }
-
- if (this.key != other.key && (this.key == null || !this.key.equals(other.key))) {
- return false;
- }
-
- return true;
- }
-
- @Override
- public String toString()
- {
- return "EventKey{" + "schemaID=" + schemaID + ", dimensionDescriptorID=" + dimensionDescriptorID +
- ", aggregatorIndex=" + aggregatorID + ", key=" + key + '}';
- }
-
- public static List<EventKey> createEventKeys(int schemaId,
- int dimensionsDescriptorId,
- int aggregatorId,
- List<GPOMutable> keys)
- {
- List<EventKey> eventKeys = Lists.newArrayList();
-
- for (GPOMutable key : keys) {
- eventKeys.add(new EventKey(schemaId, dimensionsDescriptorId, aggregatorId, key));
- }
-
- return eventKeys;
- }
- }
-
- @Override
- public int hashCode()
- {
- int hash = 5;
- hash = 79 * hash + (this.aggregates != null ? this.aggregates.hashCode() : 0);
- hash = 79 * hash + (this.eventKey != null ? this.eventKey.hashCode() : 0);
- return hash;
- }
-
- @Override
- public boolean equals(Object obj)
- {
- if (obj == null) {
- return false;
- }
- if (getClass() != obj.getClass()) {
- return false;
- }
- final DimensionsEvent other = (DimensionsEvent)obj;
- if (this.aggregates != other.aggregates && (this.aggregates == null || !this.aggregates.equals(other.aggregates))) {
- return false;
- }
- if (this.eventKey != other.eventKey && (this.eventKey == null || !this.eventKey.equals(other.eventKey))) {
- return false;
- }
- return true;
- }
-
- public static class InputEvent extends DimensionsEvent
- {
- private static final long serialVersionUID = 201506210406L;
- public boolean used = false;
-
- private InputEvent()
- {
- }
-
- /**
- * This creates a {@link DimensionsEvent} from the given event key and aggregates.
- *
- * @param eventKey The key from which to create a {@link DimensionsEvent}.
- * @param aggregates The aggregates from which to create {@link DimensionsEvent}.
- */
- public InputEvent(EventKey eventKey,
- GPOMutable aggregates)
- {
- setEventKey(eventKey);
- setAggregates(aggregates);
- }
-
- /**
- * Creates a DimensionsEvent with the given key values, aggregates and ids.
- *
- * @param keys The values for fields in the key.
- * @param aggregates The values for fields in the aggregate.
- * @param bucketID The bucketID
- * @param schemaID The schemaID.
- * @param dimensionDescriptorID The dimensionsDescriptorID.
- * @param aggregatorIndex The aggregatorIndex assigned to this event by the unifier.
- */
- public InputEvent(GPOMutable keys,
- GPOMutable aggregates,
- int bucketID,
- int schemaID,
- int dimensionDescriptorID,
- int aggregatorIndex)
- {
- this.eventKey = new EventKey(bucketID,
- schemaID,
- dimensionDescriptorID,
- aggregatorIndex,
- keys);
- setAggregates(aggregates);
- }
-
- /**
- * This creates an event with the given data. Note, this constructor assumes that the bucketID will be 0.
- *
- * @param keys The value for fields in the key.
- * @param aggregates The value for fields in the aggregate.
- * @param schemaID The schemaID.
- * @param dimensionDescriptorID The dimensionsDescriptorID.
- * @param aggregatorIndex The aggregatorIndex assigned to this event by the unifier.
- */
- public InputEvent(GPOMutable keys,
- GPOMutable aggregates,
- int schemaID,
- int dimensionDescriptorID,
- int aggregatorIndex)
- {
- this.eventKey = new EventKey(schemaID,
- dimensionDescriptorID,
- aggregatorIndex,
- keys);
- setAggregates(aggregates);
- }
-
- @Override
- public int hashCode()
- {
- return GPOUtils.hashcode(this.getKeys());
- }
-
- @Override
- public boolean equals(Object obj)
- {
- if (obj == null) {
- return false;
- }
- if (getClass() != obj.getClass()) {
- return false;
- }
- final DimensionsEvent other = (DimensionsEvent)obj;
-
- if (this.eventKey != other.eventKey && (this.eventKey == null || !this.eventKey.equals(other.eventKey))) {
- return false;
- }
- return true;
- }
- }
-
- public static class Aggregate extends DimensionsEvent implements AggregateEvent
- {
- private static final long serialVersionUID = 201506190110L;
-
- /**
- * This is the aggregatorIndex assigned to this event.
- */
- protected int aggregatorIndex;
- private GPOMutable metaData;
-
- public Aggregate()
- {
- //for kryo and for extending classes
- }
-
- /**
- * This creates a {@link DimensionsEvent} from the given event key and aggregates.
- *
- * @param eventKey The key from which to create a {@link DimensionsEvent}.
- * @param aggregates The aggregates from which to create {@link DimensionsEvent}.
- */
- public Aggregate(EventKey eventKey,
- GPOMutable aggregates)
- {
- setEventKey(eventKey);
- setAggregates(aggregates);
- }
-
- public Aggregate(EventKey eventKey,
- GPOMutable aggregates,
- GPOMutable metaData)
- {
- super(eventKey,
- aggregates);
-
- this.metaData = metaData;
- }
-
- /**
- * Creates a DimensionsEvent with the given key values, aggregates and ids.
- *
- * @param keys The values for fields in the key.
- * @param aggregates The values for fields in the aggregate.
- * @param bucketID The bucketID
- * @param schemaID The schemaID.
- * @param dimensionDescriptorID The dimensionsDescriptorID.
- * @param aggregatorIndex The aggregatorIndex assigned to this event by the unifier.
- */
- public Aggregate(GPOMutable keys,
- GPOMutable aggregates,
- int bucketID,
- int schemaID,
- int dimensionDescriptorID,
- int aggregatorIndex)
- {
- this.eventKey = new EventKey(bucketID,
- schemaID,
- dimensionDescriptorID,
- aggregatorIndex,
- keys);
- setAggregates(aggregates);
- }
-
- public Aggregate(GPOMutable keys,
- GPOMutable aggregates,
- GPOMutable metaData,
- int bucketID,
- int schemaID,
- int dimensionDescriptorID,
- int aggregatorIndex)
- {
- this(keys,
- aggregates,
- bucketID,
- schemaID,
- dimensionDescriptorID,
- aggregatorIndex);
-
- this.metaData = metaData;
- }
-
- /**
- * This creates an event with the given data. Note, this constructor assumes that the bucketID will be 0.
- *
- * @param keys The value for fields in the key.
- * @param aggregates The value for fields in the aggregate.
- * @param schemaID The schemaID.
- * @param dimensionDescriptorID The dimensionsDescriptorID.
- * @param aggregatorIndex The aggregatorIndex assigned to this event by the unifier.
- */
- public Aggregate(GPOMutable keys,
- GPOMutable aggregates,
- int schemaID,
- int dimensionDescriptorID,
- int aggregatorIndex)
- {
- this.eventKey = new EventKey(schemaID,
- dimensionDescriptorID,
- aggregatorIndex,
- keys);
- setAggregates(aggregates);
- }
-
- public Aggregate(GPOMutable keys,
- GPOMutable aggregates,
- GPOMutable metaData,
- int schemaID,
- int dimensionDescriptorID,
- int aggregatorIndex)
- {
- this(keys,
- aggregates,
- schemaID,
- dimensionDescriptorID,
- aggregatorIndex);
-
- this.metaData = metaData;
- }
-
- public void setMetaData(GPOMutable metaData)
- {
- this.metaData = metaData;
- }
-
- public GPOMutable getMetaData()
- {
- return metaData;
- }
-
- public void setAggregatorIndex(int aggregatorIndex)
- {
- this.aggregatorIndex = aggregatorIndex;
- }
-
- @Override
- public int getAggregatorIndex()
- {
- return aggregatorIndex;
- }
-
- @Override
- public int hashCode()
- {
- return GPOUtils.hashcode(this.getKeys());
- }
-
- @Override
- public boolean equals(Object obj)
- {
- if (obj == null) {
- return false;
- }
- if (getClass() != obj.getClass()) {
- return false;
- }
- final DimensionsEvent other = (DimensionsEvent)obj;
-
- if (this.eventKey != other.eventKey && (this.eventKey == null || !this.eventKey.equals(other.eventKey))) {
- return false;
- }
- return true;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AbstractIncrementalAggregator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AbstractIncrementalAggregator.java b/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AbstractIncrementalAggregator.java
deleted file mode 100644
index f92bfbf..0000000
--- a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AbstractIncrementalAggregator.java
+++ /dev/null
@@ -1,190 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.dimensions.aggregator;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-
-import com.datatorrent.lib.appdata.gpo.GPOMutable;
-import com.datatorrent.lib.appdata.gpo.GPOUtils;
-import com.datatorrent.lib.appdata.schemas.CustomTimeBucket;
-import com.datatorrent.lib.dimensions.DimensionsConversionContext;
-import com.datatorrent.lib.dimensions.DimensionsEvent.Aggregate;
-import com.datatorrent.lib.dimensions.DimensionsEvent.EventKey;
-import com.datatorrent.lib.dimensions.DimensionsEvent.InputEvent;
-
-/**
- * * <p>
- * {@link IncrementalAggregator}s perform aggregations in place, on a field by field basis. For example if we have a
- * field cost, an incremental aggregator would take a new value of cost and aggregate it to an aggregate value for
- * cost. No fields except the cost field are used in the computation of the cost aggregation in the case of an
- * {@link IncrementalAggregator}.
- * </p>
- * <p>
- * {@link IncrementalAggregator}s are intended to be used with subclasses of
- * {@link com.datatorrent.lib.dimensions.AbstractDimensionsComputationFlexibleSingleSchema}. The way in which
- * {@link IncrementalAggregator}s are used in this context is that a batch of fields to be aggregated by the aggregator
- * are provided in the form of an {@link InputEvent}. For example, if there are two fields (cost and revenue), which
- * will be aggregated by a sum aggregator, both of those fields will be included in the {@link InputEvent} passed to
- * the sum aggregator. And the {DimensionsEventregate} event produced by the sum aggregator will contain two fields,
- * one for cost and one for revenue.
- * </p>
- *
- */
-public abstract class AbstractIncrementalAggregator implements IncrementalAggregator
-{
- private static final long serialVersionUID = 201506211153L;
-
- /**
- * The conversion context for this aggregator.
- */
- protected DimensionsConversionContext context;
-
- public AbstractIncrementalAggregator()
- {
- }
-
- @Override
- public void setDimensionsConversionContext(DimensionsConversionContext context)
- {
- this.context = Preconditions.checkNotNull(context);
- }
-
- @Override
- public Aggregate getGroup(InputEvent src, int aggregatorIndex)
- {
- src.used = true;
- Aggregate aggregate = createAggregate(src,
- context,
- aggregatorIndex);
- return aggregate;
- }
-
- @Override
- public int hashCode(InputEvent inputEvent)
- {
- long timestamp = -1L;
- boolean hasTime = this.context.inputTimestampIndex != -1
- && this.context.outputTimebucketIndex != -1;
-
- if (hasTime) {
- timestamp = inputEvent.getKeys().getFieldsLong()[this.context.inputTimestampIndex];
- inputEvent.getKeys().getFieldsLong()[this.context.inputTimestampIndex]
- = this.context.dd.getCustomTimeBucket().roundDown(timestamp);
- }
-
- int hashCode = GPOUtils.indirectHashcode(inputEvent.getKeys(), context.indexSubsetKeys);
-
- if (hasTime) {
- inputEvent.getKeys().getFieldsLong()[this.context.inputTimestampIndex] = timestamp;
- }
-
- return hashCode;
- }
-
- @Override
- public boolean equals(InputEvent inputEvent1, InputEvent inputEvent2)
- {
- long timestamp1 = 0;
- long timestamp2 = 0;
-
- if (context.inputTimestampIndex != -1) {
- timestamp1 = inputEvent1.getKeys().getFieldsLong()[context.inputTimestampIndex];
- inputEvent1.getKeys().getFieldsLong()[context.inputTimestampIndex] =
- context.dd.getCustomTimeBucket().roundDown(timestamp1);
-
- timestamp2 = inputEvent2.getKeys().getFieldsLong()[context.inputTimestampIndex];
- inputEvent2.getKeys().getFieldsLong()[context.inputTimestampIndex] =
- context.dd.getCustomTimeBucket().roundDown(timestamp2);
- }
-
- boolean equals = GPOUtils.subsetEquals(inputEvent2.getKeys(),
- inputEvent1.getKeys(),
- context.indexSubsetKeys);
-
- if (context.inputTimestampIndex != -1) {
- inputEvent1.getKeys().getFieldsLong()[context.inputTimestampIndex] = timestamp1;
- inputEvent2.getKeys().getFieldsLong()[context.inputTimestampIndex] = timestamp2;
- }
-
- return equals;
- }
-
- /**
- * Creates an {@link Aggregate} from the given {@link InputEvent}.
- *
- * @param inputEvent The {@link InputEvent} to unpack into an {@link Aggregate}.
- * @param context The conversion context required to transform the {@link InputEvent} into
- * the correct {@link Aggregate}.
- * @param aggregatorIndex The aggregatorIndex assigned to this {@link Aggregate}.
- * @return The converted {@link Aggregate}.
- */
- public static Aggregate createAggregate(InputEvent inputEvent,
- DimensionsConversionContext context,
- int aggregatorIndex)
- {
- GPOMutable aggregates = new GPOMutable(context.aggregateDescriptor);
- EventKey eventKey = createEventKey(inputEvent,
- context,
- aggregatorIndex);
-
- Aggregate aggregate = new Aggregate(eventKey,
- aggregates);
- aggregate.setAggregatorIndex(aggregatorIndex);
-
- return aggregate;
- }
-
- /**
- * Creates an {@link EventKey} from the given {@link InputEvent}.
- *
- * @param inputEvent The {@link InputEvent} to extract an {@link EventKey} from.
- * @param context The conversion context required to extract the {@link EventKey} from
- * the given {@link InputEvent}.
- * @param aggregatorIndex The aggregatorIndex to assign to this {@link InputEvent}.
- * @return The {@link EventKey} extracted from the given {@link InputEvent}.
- */
- public static EventKey createEventKey(InputEvent inputEvent,
- DimensionsConversionContext context,
- int aggregatorIndex)
- {
- GPOMutable keys = new GPOMutable(context.keyDescriptor);
- GPOUtils.indirectCopy(keys, inputEvent.getKeys(), context.indexSubsetKeys);
-
- if (context.outputTimebucketIndex >= 0) {
- CustomTimeBucket timeBucket = context.dd.getCustomTimeBucket();
-
- keys.getFieldsInteger()[context.outputTimebucketIndex] = context.customTimeBucketRegistry.getTimeBucketId(
- timeBucket);
- keys.getFieldsLong()[context.outputTimestampIndex] =
- timeBucket.roundDown(inputEvent.getKeys().getFieldsLong()[context.inputTimestampIndex]);
- }
-
- EventKey eventKey = new EventKey(context.schemaID,
- context.dimensionsDescriptorID,
- context.aggregatorID,
- keys);
-
- return eventKey;
- }
-
- private static final Logger LOG = LoggerFactory.getLogger(AbstractIncrementalAggregator.class);
-}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregateEvent.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregateEvent.java b/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregateEvent.java
deleted file mode 100644
index e8f2f3e..0000000
--- a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregateEvent.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.dimensions.aggregator;
-
-import it.unimi.dsi.fastutil.Hash;
-
-/**
- * @since 3.3.0
- */
-public interface AggregateEvent
-{
- int getAggregatorIndex();
-
- public static interface Aggregator<EVENT, AGGREGATE extends AggregateEvent> extends Hash.Strategy<EVENT>
- {
- AGGREGATE getGroup(EVENT src, int aggregatorIndex);
-
- void aggregate(AGGREGATE dest, EVENT src);
-
- void aggregate(AGGREGATE dest, AGGREGATE src);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorAverage.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorAverage.java b/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorAverage.java
deleted file mode 100644
index c15bf25..0000000
--- a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorAverage.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.dimensions.aggregator;
-
-import java.util.List;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-
-import com.datatorrent.api.annotation.Name;
-import com.datatorrent.lib.appdata.gpo.GPOMutable;
-import com.datatorrent.lib.appdata.schemas.Fields;
-import com.datatorrent.lib.appdata.schemas.FieldsDescriptor;
-import com.datatorrent.lib.appdata.schemas.Type;
-
-/**
- * This is the average {@link OTFAggregator}.
- *
- * @since 3.1.0
- */
-@Name("AVG")
-public class AggregatorAverage implements OTFAggregator
-{
- private static final long serialVersionUID = 20154301644L;
-
- /**
- * The array index of the sum aggregates in the argument list of the {@link #aggregate} function.
- */
- public static int SUM_INDEX = 0;
- /**
- * The array index of the count aggregates in the argument list of the {@link #aggregate} function.
- */
- public static int COUNT_INDEX = 1;
- /**
- * The singleton instance of this class.
- */
- public static final AggregatorAverage INSTANCE = new AggregatorAverage();
-
- /**
- * The list of {@link IncrementalAggregator}s that this {@link OTFAggregator} depends on.
- */
- public static final transient List<Class<? extends IncrementalAggregator>> CHILD_AGGREGATORS =
- ImmutableList.of(AggregatorIncrementalType.SUM.getAggregator().getClass(),
- AggregatorIncrementalType.COUNT.getAggregator().getClass());
-
- /**
- * Constructor for singleton pattern.
- */
- protected AggregatorAverage()
- {
- //Do nothing
- }
-
- @Override
- public List<Class<? extends IncrementalAggregator>> getChildAggregators()
- {
- return CHILD_AGGREGATORS;
- }
-
- @Override
- public GPOMutable aggregate(GPOMutable... aggregates)
- {
- Preconditions.checkArgument(aggregates.length == getChildAggregators().size(),
- "The number of arguments " + aggregates.length +
- " should be the same as the number of child aggregators " + getChildAggregators().size());
-
- GPOMutable sumAggregation = aggregates[SUM_INDEX];
- GPOMutable countAggregation = aggregates[COUNT_INDEX];
-
- FieldsDescriptor fieldsDescriptor = sumAggregation.getFieldDescriptor();
- Fields fields = fieldsDescriptor.getFields();
- GPOMutable result = new GPOMutable(AggregatorUtils.getOutputFieldsDescriptor(fields, this));
-
- long count = countAggregation.getFieldsLong()[0];
-
- for (String field : fields.getFields()) {
- Type type = sumAggregation.getFieldDescriptor().getType(field);
-
- switch (type) {
- case BYTE: {
- double val = ((double)sumAggregation.getFieldByte(field)) /
- ((double)count);
- result.setField(field, val);
- break;
- }
- case SHORT: {
- double val = ((double)sumAggregation.getFieldShort(field)) /
- ((double)count);
- result.setField(field, val);
- break;
- }
- case INTEGER: {
- double val = ((double)sumAggregation.getFieldInt(field)) /
- ((double)count);
- result.setField(field, val);
- break;
- }
- case LONG: {
- double val = ((double)sumAggregation.getFieldLong(field)) /
- ((double)count);
- result.setField(field, val);
- break;
- }
- case FLOAT: {
- double val = sumAggregation.getFieldFloat(field) /
- ((double)count);
- result.setField(field, val);
- break;
- }
- case DOUBLE: {
- double val = sumAggregation.getFieldDouble(field) /
- ((double)count);
- result.setField(field, val);
- break;
- }
- default: {
- throw new UnsupportedOperationException("The type " + type + " is not supported.");
- }
- }
- }
-
- return result;
- }
-
- @Override
- public Type getOutputType()
- {
- return Type.DOUBLE;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorCount.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorCount.java b/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorCount.java
deleted file mode 100644
index 8566e1c..0000000
--- a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorCount.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.dimensions.aggregator;
-
-import java.util.Collections;
-import java.util.Map;
-
-import com.google.common.collect.Maps;
-
-import com.datatorrent.api.annotation.Name;
-import com.datatorrent.lib.appdata.gpo.GPOMutable;
-import com.datatorrent.lib.appdata.gpo.GPOUtils;
-import com.datatorrent.lib.appdata.schemas.FieldsDescriptor;
-import com.datatorrent.lib.appdata.schemas.Type;
-import com.datatorrent.lib.dimensions.DimensionsEvent.Aggregate;
-import com.datatorrent.lib.dimensions.DimensionsEvent.EventKey;
-import com.datatorrent.lib.dimensions.DimensionsEvent.InputEvent;
-
-/**
- * This {@link IncrementalAggregator} performs a count of the number of times an input is encountered.
- *
- * @since 3.1.0
- */
-@Name("COUNT")
-public class AggregatorCount extends AbstractIncrementalAggregator
-{
- private static final long serialVersionUID = 20154301645L;
-
- /**
- * This is a map whose keys represent input types and whose values
- * represent the corresponding output types.
- */
- public static final transient Map<Type, Type> TYPE_CONVERSION_MAP;
-
- static {
- Map<Type, Type> typeConversionMap = Maps.newHashMap();
-
- for (Type type : Type.values()) {
- typeConversionMap.put(type, Type.LONG);
- }
-
- TYPE_CONVERSION_MAP = Collections.unmodifiableMap(typeConversionMap);
- }
-
- public AggregatorCount()
- {
- //Do nothing
- }
-
- @Override
- public Aggregate getGroup(InputEvent src, int aggregatorIndex)
- {
- src.used = true;
- GPOMutable aggregates = new GPOMutable(context.aggregateDescriptor);
- GPOMutable keys = new GPOMutable(context.keyDescriptor);
- GPOUtils.indirectCopy(keys, src.getKeys(), context.indexSubsetKeys);
-
- EventKey eventKey = createEventKey(src,
- context,
- aggregatorIndex);
-
- long[] longFields = aggregates.getFieldsLong();
-
- for (int index = 0;
- index < longFields.length;
- index++) {
- longFields[index] = 0;
- }
-
- return new Aggregate(eventKey,
- aggregates);
- }
-
- @Override
- public void aggregate(Aggregate dest, InputEvent src)
- {
- long[] fieldsLong = dest.getAggregates().getFieldsLong();
-
- for (int index = 0;
- index < fieldsLong.length;
- index++) {
- //increment count
- fieldsLong[index]++;
- }
- }
-
- @Override
- public void aggregate(Aggregate destAgg, Aggregate srcAgg)
- {
- long[] destLongs = destAgg.getAggregates().getFieldsLong();
- long[] srcLongs = srcAgg.getAggregates().getFieldsLong();
-
- for (int index = 0;
- index < destLongs.length;
- index++) {
- //aggregate count
- destLongs[index] += srcLongs[index];
- }
- }
-
- @Override
- public Type getOutputType(Type inputType)
- {
- return TYPE_CONVERSION_MAP.get(inputType);
- }
-
- @Override
- public FieldsDescriptor getMetaDataDescriptor()
- {
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorCumSum.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorCumSum.java b/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorCumSum.java
deleted file mode 100644
index f5924b8..0000000
--- a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorCumSum.java
+++ /dev/null
@@ -1,233 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.dimensions.aggregator;
-
-import java.util.List;
-import java.util.Map;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-import com.datatorrent.api.annotation.Name;
-import com.datatorrent.lib.appdata.gpo.GPOMutable;
-import com.datatorrent.lib.appdata.gpo.GPOUtils;
-import com.datatorrent.lib.appdata.gpo.Serde;
-import com.datatorrent.lib.appdata.gpo.SerdeFieldsDescriptor;
-import com.datatorrent.lib.appdata.gpo.SerdeListGPOMutable;
-import com.datatorrent.lib.appdata.gpo.SerdeObjectPayloadFix;
-import com.datatorrent.lib.appdata.schemas.FieldsDescriptor;
-import com.datatorrent.lib.appdata.schemas.Type;
-import com.datatorrent.lib.dimensions.DimensionsEvent.Aggregate;
-import com.datatorrent.lib.dimensions.DimensionsEvent.InputEvent;
-
-@Name("CUM_SUM")
-/**
- * @since 3.1.0
- */
-
-public class AggregatorCumSum extends AggregatorSum
-{
- private static final long serialVersionUID = 201506280518L;
-
- public static final int KEY_FD_INDEX = 0;
- public static final int AGGREGATE_FD_INDEX = 1;
- public static final int KEYS_INDEX = 2;
- public static final int AGGREGATES_INDEX = 3;
-
- public static final FieldsDescriptor META_DATA_FIELDS_DESCRIPTOR;
-
- static {
- Map<String, Type> fieldToType = Maps.newHashMap();
- fieldToType.put("fdkeys", Type.OBJECT);
- fieldToType.put("fdvalues", Type.OBJECT);
- fieldToType.put("keys", Type.OBJECT);
- fieldToType.put("values", Type.OBJECT);
-
- Map<String, Serde> fieldToSerde = Maps.newHashMap();
- fieldToSerde.put("fdkeys", SerdeFieldsDescriptor.INSTANCE);
- fieldToSerde.put("fdvalues", SerdeFieldsDescriptor.INSTANCE);
- fieldToSerde.put("keys", SerdeListGPOMutable.INSTANCE);
- fieldToSerde.put("values", SerdeListGPOMutable.INSTANCE);
-
- META_DATA_FIELDS_DESCRIPTOR = new FieldsDescriptor(fieldToType,
- fieldToSerde,
- new PayloadFix());
- }
-
- public AggregatorCumSum()
- {
- }
-
- @Override
- public Aggregate getGroup(InputEvent src, int aggregatorIndex)
- {
- src.used = true;
- Aggregate agg = createAggregate(src,
- context,
- aggregatorIndex);
-
- GPOUtils.indirectCopy(agg.getAggregates(), src.getAggregates(), context.indexSubsetAggregates);
-
- GPOMutable metaData = new GPOMutable(getMetaDataDescriptor());
-
- GPOMutable fullKey = new GPOMutable(src.getKeys());
-
- if (context.inputTimestampIndex >= 0) {
- fullKey.getFieldsLong()[context.inputTimestampIndex] = -1L;
- }
-
- List<GPOMutable> keys = Lists.newArrayList(fullKey);
-
- GPOMutable value = new GPOMutable(agg.getAggregates());
- List<GPOMutable> values = Lists.newArrayList(value);
-
- metaData.getFieldsObject()[KEY_FD_INDEX] = fullKey.getFieldDescriptor();
- metaData.getFieldsObject()[AGGREGATE_FD_INDEX] = value.getFieldDescriptor();
- metaData.getFieldsObject()[KEYS_INDEX] = keys;
- metaData.getFieldsObject()[AGGREGATES_INDEX] = values;
- agg.setMetaData(metaData);
-
- return agg;
- }
-
- @Override
- public void aggregate(Aggregate dest, InputEvent src)
- {
- @SuppressWarnings("unchecked")
- List<GPOMutable> destKeys =
- (List<GPOMutable>)dest.getMetaData().getFieldsObject()[KEYS_INDEX];
-
- @SuppressWarnings("unchecked")
- List<GPOMutable> destAggregates =
- (List<GPOMutable>)dest.getMetaData().getFieldsObject()[AGGREGATES_INDEX];
-
- long timestamp = 0L;
-
- if (context.inputTimestampIndex >= 0) {
- timestamp = src.getKeys().getFieldsLong()[context.inputTimestampIndex];
- src.getKeys().getFieldsLong()[context.inputTimestampIndex] = -1L;
- }
-
- if (!contains(destKeys, src.getKeys())) {
- destKeys.add(new GPOMutable(src.getKeys()));
-
- GPOMutable aggregates = new GPOMutable(context.aggregateDescriptor);
- GPOUtils.indirectCopy(aggregates, src.getAggregates(), context.indexSubsetAggregates);
-
- destAggregates.add(aggregates);
-
- this.aggregateAggs(dest.getAggregates(), aggregates);
- }
-
- if (context.inputTimestampIndex >= 0) {
- src.getKeys().getFieldsLong()[context.inputTimestampIndex] = timestamp;
- }
- }
-
- @Override
- public void aggregate(Aggregate dest, Aggregate src)
- {
- dest.getMetaData().applyObjectPayloadFix();
- src.getMetaData().applyObjectPayloadFix();
-
- @SuppressWarnings("unchecked")
- List<GPOMutable> destKeys =
- (List<GPOMutable>)dest.getMetaData().getFieldsObject()[KEYS_INDEX];
-
- @SuppressWarnings("unchecked")
- List<GPOMutable> srcKeys =
- (List<GPOMutable>)src.getMetaData().getFieldsObject()[KEYS_INDEX];
-
- @SuppressWarnings("unchecked")
- List<GPOMutable> destAggregates =
- (List<GPOMutable>)dest.getMetaData().getFieldsObject()[AGGREGATES_INDEX];
-
- @SuppressWarnings("unchecked")
- List<GPOMutable> srcAggregates =
- (List<GPOMutable>)src.getMetaData().getFieldsObject()[AGGREGATES_INDEX];
-
- List<GPOMutable> newKeys = Lists.newArrayList();
- List<GPOMutable> newAggs = Lists.newArrayList();
-
- for (int index = 0;
- index < srcKeys.size();
- index++) {
- GPOMutable currentSrcKey = srcKeys.get(index);
- GPOMutable currentSrcAgg = srcAggregates.get(index);
-
- if (!contains(destKeys, currentSrcKey)) {
- newKeys.add(currentSrcKey);
- newAggs.add(currentSrcAgg);
-
- this.aggregateAggs(dest.getAggregates(), currentSrcAgg);
- }
- }
-
- destKeys.addAll(newKeys);
- destAggregates.addAll(newAggs);
- }
-
- private boolean contains(List<GPOMutable> mutables, GPOMutable mutable)
- {
- for (int index = 0;
- index < mutables.size();
- index++) {
- GPOMutable mutableFromList = mutables.get(index);
-
- if (GPOUtils.equals(mutableFromList, mutable)) {
- return true;
- }
- }
-
- return false;
- }
-
- @Override
- public FieldsDescriptor getMetaDataDescriptor()
- {
- return META_DATA_FIELDS_DESCRIPTOR;
- }
-
- public static class PayloadFix implements SerdeObjectPayloadFix
- {
- @Override
- public void fix(Object[] objects)
- {
- FieldsDescriptor keyfd = (FieldsDescriptor)objects[KEY_FD_INDEX];
- FieldsDescriptor valuefd = (FieldsDescriptor)objects[AGGREGATE_FD_INDEX];
-
- @SuppressWarnings("unchecked")
- List<GPOMutable> keyMutables = (List<GPOMutable>)objects[KEYS_INDEX];
- @SuppressWarnings("unchecked")
- List<GPOMutable> aggregateMutables = (List<GPOMutable>)objects[AGGREGATES_INDEX];
-
- fix(keyfd, keyMutables);
- fix(valuefd, aggregateMutables);
- }
-
- private void fix(FieldsDescriptor fd, List<GPOMutable> mutables)
- {
- for (int index = 0;
- index < mutables.size();
- index++) {
- mutables.get(index).setFieldDescriptor(fd);
- }
- }
- }
-}