You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/01/29 20:12:25 UTC
[08/50] [abbrv] incubator-apex-malhar git commit: MLHR-1944 #resolve
#comment added dimensions helper classes
MLHR-1944 #resolve #comment added dimensions helper classes
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/a4718f02
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/a4718f02
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/a4718f02
Branch: refs/heads/master
Commit: a4718f02434736bf6d9e5f98d01e923e950503f5
Parents: c31c5ea 691ee6f
Author: Timothy Farkas <ti...@datatorrent.com>
Authored: Tue Dec 15 12:45:13 2015 -0800
Committer: brightchen <br...@datatorrent.com>
Committed: Tue Dec 15 16:47:53 2015 -0800
----------------------------------------------------------------------
.../dimensions/CustomTimeBucketRegistry.java | 138 ++++++
.../dimensions/DimensionsConversionContext.java | 114 +++++
.../lib/dimensions/DimensionsDescriptor.java | 420 +++++++++++++++++++
.../dimensions/aggregator/AggregateEvent.java | 35 ++
.../CustomTimeBucketRegistryTest.java | 87 ++++
.../dimensions/DimensionsDescriptorTest.java | 101 +++++
6 files changed, 895 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a4718f02/library/src/main/java/com/datatorrent/lib/dimensions/CustomTimeBucketRegistry.java
----------------------------------------------------------------------
diff --cc library/src/main/java/com/datatorrent/lib/dimensions/CustomTimeBucketRegistry.java
index 0000000,0000000..fc11647
new file mode 100644
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/dimensions/CustomTimeBucketRegistry.java
@@@ -1,0 -1,0 +1,138 @@@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++package com.datatorrent.lib.dimensions;
++
++import java.io.Serializable;
++import java.util.HashMap;
++import java.util.Map;
++
++import com.google.common.base.Preconditions;
++
++import com.datatorrent.lib.appdata.schemas.CustomTimeBucket;
++
++import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
++import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
++import it.unimi.dsi.fastutil.objects.Object2IntMap;
++import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
++
++public class CustomTimeBucketRegistry implements Serializable
++{
++ private static final long serialVersionUID = 201509221536L;
++
++ private int currentId;
++
++ private Int2ObjectMap<CustomTimeBucket> idToTimeBucket = new Int2ObjectOpenHashMap<>();
++ private Object2IntMap<CustomTimeBucket> timeBucketToId = new Object2IntOpenHashMap<>();
++ private Map<String, CustomTimeBucket> textToTimeBucket = new HashMap<>();
++
++ public CustomTimeBucketRegistry()
++ {
++ }
++
++ public CustomTimeBucketRegistry(int startingId)
++ {
++ this.currentId = startingId;
++ }
++
++ public CustomTimeBucketRegistry(Int2ObjectMap<CustomTimeBucket> idToTimeBucket)
++ {
++ initialize(idToTimeBucket);
++ }
++
++ public CustomTimeBucketRegistry(Int2ObjectMap<CustomTimeBucket> idToTimeBucket,
++ int startingId)
++ {
++ int tempId = initialize(idToTimeBucket);
++
++ Preconditions.checkArgument(tempId < startingId, "The statingId " + startingId
++ + " must be larger than the largest ID " + tempId
++ + " in the given idToTimeBucket mapping");
++
++ this.idToTimeBucket = Preconditions.checkNotNull(idToTimeBucket);
++ this.currentId = startingId;
++ }
++
++ private int initialize(Int2ObjectMap<CustomTimeBucket> idToTimeBucket)
++ {
++ Preconditions.checkNotNull(idToTimeBucket);
++
++ int tempId = Integer.MIN_VALUE;
++
++ for (int timeBucketId : idToTimeBucket.keySet()) {
++ tempId = Math.max(tempId, timeBucketId);
++ CustomTimeBucket customTimeBucket = idToTimeBucket.get(timeBucketId);
++ textToTimeBucket.put(customTimeBucket.getText(), customTimeBucket);
++ Preconditions.checkNotNull(customTimeBucket);
++ timeBucketToId.put(customTimeBucket, timeBucketId);
++ }
++
++ return tempId;
++ }
++
++ public CustomTimeBucket getTimeBucket(int timeBucketId)
++ {
++ return idToTimeBucket.get(timeBucketId);
++ }
++
++ public Integer getTimeBucketId(CustomTimeBucket timeBucket)
++ {
++ if (!timeBucketToId.containsKey(timeBucket)) {
++ return null;
++ }
++
++ return timeBucketToId.get(timeBucket);
++ }
++
++ public CustomTimeBucket getTimeBucket(String text)
++ {
++ return textToTimeBucket.get(text);
++ }
++
++ public void register(CustomTimeBucket timeBucket)
++ {
++ register(timeBucket, currentId);
++ }
++
++ public void register(CustomTimeBucket timeBucket, int timeBucketId)
++ {
++ if (timeBucketToId.containsKey(timeBucket)) {
++ throw new IllegalArgumentException("The timeBucket " + timeBucket + " is already registered.");
++ }
++
++ if (timeBucketToId.containsValue(timeBucketId)) {
++ throw new IllegalArgumentException("The timeBucketId " + timeBucketId + " is already registered.");
++ }
++
++ idToTimeBucket.put(timeBucketId, timeBucket);
++ timeBucketToId.put(timeBucket, timeBucketId);
++
++ if (timeBucketId >= currentId) {
++ currentId = timeBucketId + 1;
++ }
++
++ textToTimeBucket.put(timeBucket.getText(), timeBucket);
++ }
++
++ @Override
++ public String toString()
++ {
++ return "CustomTimeBucketRegistry{" + "idToTimeBucket=" + idToTimeBucket + '}';
++ }
++
++}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a4718f02/library/src/main/java/com/datatorrent/lib/dimensions/DimensionsConversionContext.java
----------------------------------------------------------------------
diff --cc library/src/main/java/com/datatorrent/lib/dimensions/DimensionsConversionContext.java
index 0000000,0000000..9247320
new file mode 100644
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/dimensions/DimensionsConversionContext.java
@@@ -1,0 -1,0 +1,114 @@@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++package com.datatorrent.lib.dimensions;
++
++import java.io.Serializable;
++
++import com.datatorrent.lib.appdata.gpo.GPOUtils.IndexSubset;
++import com.datatorrent.lib.appdata.schemas.FieldsDescriptor;
++
++/**
++ * This is a context object used to convert {@link InputEvent}s into aggregates
++ * in {@link IncrementalAggregator}s.
++ */
++public class DimensionsConversionContext implements Serializable
++{
++ private static final long serialVersionUID = 201506151157L;
++
++ public CustomTimeBucketRegistry customTimeBucketRegistry;
++ /**
++ * The schema ID for {@link Aggregate}s emitted by the
++ * {@link com.datatorrent.lib.dimensions.aggregator.IncrementalAggregator}s
++ * holding this context.
++ */
++ public int schemaID;
++ /**
++ * The dimensionsDescriptor ID for {@link Aggregate}s emitted by the
++ * {@link com.datatorrent.lib.dimensions.aggregator.IncrementalAggregator}s
++ * holding this context.
++ */
++ public int dimensionsDescriptorID;
++ /**
++ * The aggregator ID for {@link Aggregate}s emitted by the
++ * {@link com.datatorrent.lib.dimensions.aggregator.IncrementalAggregator}s
++ * holding this context.
++ */
++ public int aggregatorID;
++ /**
++ * The {@link DimensionsDescriptor} corresponding to the given dimension
++ * descriptor id.
++ */
++ public DimensionsDescriptor dd;
++ /**
++ * The {@link FieldsDescriptor} for the aggregate of the {@link Aggregate}s
++ * emitted by the
++ * {@link com.datatorrent.lib.dimensions.aggregator.IncrementalAggregator}s
++ * holding this context object.
++ */
++ public FieldsDescriptor aggregateDescriptor;
++ /**
++ * The {@link FieldsDescriptor} for the key of the {@link Aggregate}s emitted
++ * by the
++ * {@link com.datatorrent.lib.dimensions.aggregator.IncrementalAggregator}s
++ * holding this context object.
++ */
++ public FieldsDescriptor keyDescriptor;
++ /**
++ * The index of the timestamp field within the key of {@link InputEvent}s
++ * received by the
++ * {@link com.datatorrent.lib.dimensions.aggregator.IncrementalAggregator}s
++ * holding this context object. This is -1 if the {@link InputEvent} key has
++ * no timestamp.
++ */
++ public int inputTimestampIndex;
++ /**
++ * The index of the timestamp field within the key of {@link Aggregate}s
++ * emitted by the
++ * {@link com.datatorrent.lib.dimensions.aggregator.IncrementalAggregator}s
++ * holding this context object. This is -1 if the {@link Aggregate}'s key has
++ * no timestamp.
++ */
++ public int outputTimestampIndex;
++ /**
++ * The index of the time bucket field within the key of {@link Aggregate}s
++ * emitted by the
++ * {@link com.datatorrent.lib.dimensions.aggregator.IncrementalAggregator}s
++ * holding this context object. This is -1 if the {@link Aggregate}'s key has
++ * no timebucket.
++ */
++ public int outputTimebucketIndex;
++ /**
++ * The {@link IndexSubset} object that is used to extract key values from
++ * {@link InputEvent}s received by this aggregator.
++ */
++ public IndexSubset indexSubsetKeys;
++ /**
++ * The {@link IndexSubset} object that is used to extract aggregate values
++ * from {@link InputEvent}s received by this aggregator.
++ */
++ public IndexSubset indexSubsetAggregates;
++
++ /**
++ * Constructor for creating conversion context.
++ */
++ public DimensionsConversionContext()
++ {
++ //Do nothing.
++ }
++}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a4718f02/library/src/main/java/com/datatorrent/lib/dimensions/DimensionsDescriptor.java
----------------------------------------------------------------------
diff --cc library/src/main/java/com/datatorrent/lib/dimensions/DimensionsDescriptor.java
index 0000000,0000000..e593112
new file mode 100644
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/dimensions/DimensionsDescriptor.java
@@@ -1,0 -1,0 +1,420 @@@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++package com.datatorrent.lib.dimensions;
++
++import java.io.Serializable;
++import java.util.Collections;
++import java.util.List;
++import java.util.Map;
++import java.util.Set;
++import java.util.concurrent.TimeUnit;
++
++import org.slf4j.Logger;
++import org.slf4j.LoggerFactory;
++
++import com.google.common.base.Preconditions;
++import com.google.common.collect.ImmutableSet;
++import com.google.common.collect.Maps;
++import com.google.common.collect.Sets;
++
++import com.datatorrent.lib.appdata.schemas.CustomTimeBucket;
++import com.datatorrent.lib.appdata.schemas.Fields;
++import com.datatorrent.lib.appdata.schemas.FieldsDescriptor;
++import com.datatorrent.lib.appdata.schemas.TimeBucket;
++import com.datatorrent.lib.appdata.schemas.Type;
++
++/**
++ * <p>
++ * This class defines a dimensions combination which is used by dimensions computation operators
++ * and stores. A dimension combination is composed of the names of the fields that constitute the key,
++ * as well as the TimeBucket under which data is stored.
++ * </p>
++ * <p>
++ * This class supports the creation of a dimensions combination from a {@link TimeBucket} object and a set of fields.
++ * It also supports the creation of a dimensions combination an aggregation string. An aggregation string looks like
++ * the following:
++ * <br/>
++ * <br/>
++ * {@code
++ * "time=MINUTES:publisher:advertiser"
++ * }
++ * <br/>
++ * <br/>
++ * In the example above <b>"time=MINUTES"</b> represents a time bucket, and the other colon separated strings represent
++ * the name of fields which comprise the key for this dimension combination. When specifiying a time bucket in an
++ * aggregation string you must use the name of one of the TimeUnit enums.
++ * </p>
++ * <p>
++ * One of the primary uses of a {@link DimensionsDescriptor} is for querying a dimensional data store. When a query is
++ * received for a dimensional data store, the query must be mapped to many things including a dimensionDescriptorID. The
++ * dimensionDescriptorID is an id assigned to a class of dimension combinations which share the same keys. This
++ * mapping is
++ * performed by creating a
++ * {@link DimensionsDescriptor} object from the query, and then using the {@link DimensionsDescriptor} object
++ * to look up the correct dimensionsDescriptorID. This lookup to retrieve a dimensionsDescriptorID is necessary
++ * because a
++ * dimensionsDescriptorID is used for storage in order to prevent key conflicts.
++ * </p>
++ *
++ */
++public class DimensionsDescriptor implements Serializable, Comparable<DimensionsDescriptor>
++{
++ private static final long serialVersionUID = 201506251237L;
++
++ /**
++ * Name of the reserved time field.
++ */
++ public static final String DIMENSION_TIME = "time";
++ /**
++ * Type of the reserved time field.
++ */
++ public static final Type DIMENSION_TIME_TYPE = Type.LONG;
++ /**
++ * Name of the reserved time bucket field.
++ */
++ public static final String DIMENSION_TIME_BUCKET = "timeBucket";
++ /**
++ * Type of the reserved time bucket field.
++ */
++ public static final Type DIMENSION_TIME_BUCKET_TYPE = Type.INTEGER;
++ /**
++ * The set of fields used for time, which are intended to be queried. Not that the
++ * timeBucket field is not included here because its not intended to be queried.
++ */
++ public static final Fields TIME_FIELDS = new Fields(Sets.newHashSet(DIMENSION_TIME));
++ /**
++ * This set represents the field names which cannot be part of the user defined field names in a schema for
++ * dimensions computation.
++ */
++ public static final Set<String> RESERVED_DIMENSION_NAMES = ImmutableSet.of(DIMENSION_TIME,
++ DIMENSION_TIME_BUCKET);
++ /**
++ * This is the equals string separator used when defining a time bucket for a dimensions combination.
++ */
++ public static final String DELIMETER_EQUALS = "=";
++ /**
++ * This separates dimensions in the dimensions combination.
++ */
++ public static final String DELIMETER_SEPERATOR = ":";
++ /**
++ * A map from a key field to its type.
++ */
++ public static final Map<String, Type> DIMENSION_FIELD_TO_TYPE;
++
++ /**
++ * The time bucket used for this dimension combination.
++ */
++ private TimeBucket timeBucket;
++ /**
++ * The custom time bucket used for this dimension combination.
++ */
++ private CustomTimeBucket customTimeBucket;
++ /**
++ * The set of key fields which compose this dimension combination.
++ */
++ private Fields fields;
++
++ static {
++ Map<String, Type> dimensionFieldToType = Maps.newHashMap();
++
++ dimensionFieldToType.put(DIMENSION_TIME, DIMENSION_TIME_TYPE);
++ dimensionFieldToType.put(DIMENSION_TIME_BUCKET, DIMENSION_TIME_BUCKET_TYPE);
++
++ DIMENSION_FIELD_TO_TYPE = Collections.unmodifiableMap(dimensionFieldToType);
++ }
++
++ /**
++ * Constructor for kryo serialization.
++ */
++ private DimensionsDescriptor()
++ {
++ //for kryo
++ }
++
++ /**
++ * Creates a dimensions descriptor (dimensions combination) with the given {@link TimeBucket} and key fields.
++ *
++ * @param timeBucket The {@link TimeBucket} that this dimensions combination represents.
++ * @param fields The key fields included in this dimensions combination.
++ * @deprecated use
++ * {@link #DimensionsDescriptor(com.datatorrent.lib.appdata.schemas.CustomTimeBucket,
++ * com.datatorrent.lib.appdata.schemas.Fields)} instead.
++ */
++ @Deprecated
++ public DimensionsDescriptor(TimeBucket timeBucket,
++ Fields fields)
++ {
++ setTimeBucket(timeBucket);
++ setFields(fields);
++ }
++
++ /**
++ * Creates a dimensions descriptor (dimensions combination) with the given {@link CustomTimeBucket} and key fields.
++ *
++ * @param timeBucket The {@link CustomTimeBucket} that this dimensions combination represents.
++ * @param fields The key fields included in this dimensions combination.
++ */
++ public DimensionsDescriptor(CustomTimeBucket timeBucket,
++ Fields fields)
++ {
++ setCustomTimeBucket(timeBucket);
++ setFields(fields);
++ }
++
++ /**
++ * Creates a dimensions descriptor (dimensions combination) with the given key fields.
++ *
++ * @param fields The key fields included in this dimensions combination.
++ */
++ public DimensionsDescriptor(Fields fields)
++ {
++ setFields(fields);
++ }
++
++ /**
++ * This construction creates a dimensions descriptor (dimensions combination) from the given aggregation string.
++ *
++ * @param aggregationString The aggregation string to use when initializing this dimensions combination.
++ */
++ public DimensionsDescriptor(String aggregationString)
++ {
++ initialize(aggregationString);
++ }
++
++ /**
++ * Initializes the dimensions combination with the given aggregation string.
++ *
++ * @param aggregationString The aggregation string with which to initialize this dimensions combination.
++ */
++ private void initialize(String aggregationString)
++ {
++ String[] fieldArray = aggregationString.split(DELIMETER_SEPERATOR);
++ Set<String> fieldSet = Sets.newHashSet();
++
++ for (String field : fieldArray) {
++ String[] fieldAndValue = field.split(DELIMETER_EQUALS);
++ String fieldName = fieldAndValue[0];
++
++ if (fieldName.equals(DIMENSION_TIME_BUCKET)) {
++ throw new IllegalArgumentException(DIMENSION_TIME_BUCKET + " is an invalid time.");
++ }
++
++ if (!fieldName.equals(DIMENSION_TIME)) {
++ fieldSet.add(fieldName);
++ }
++
++ if (fieldName.equals(DIMENSION_TIME)) {
++ if (timeBucket != null) {
++ throw new IllegalArgumentException("Cannot specify time in a dimensions "
++ + "descriptor when a timebucket is also "
++ + "specified.");
++ }
++
++ if (fieldAndValue.length == 2) {
++
++ timeBucket = TimeBucket.TIME_UNIT_TO_TIME_BUCKET.get(TimeUnit.valueOf(fieldAndValue[1]));
++ }
++ }
++ }
++
++ fields = new Fields(fieldSet);
++ }
++
++ /**
++ * This is a helper method which sets and validates the {@link TimeBucket}.
++ *
++ * @param timeBucket The {@link TimeBucket} to set and validate.
++ */
++ private void setTimeBucket(TimeBucket timeBucket)
++ {
++ Preconditions.checkNotNull(timeBucket);
++ this.timeBucket = timeBucket;
++ this.customTimeBucket = new CustomTimeBucket(timeBucket);
++ }
++
++ /**
++ * This is a helper method which sets and validates the {@link CustomTimeBucket}.
++ *
++ * @param customTimeBucket The {@link CustomTimeBucket} to set and validate.
++ */
++ private void setCustomTimeBucket(CustomTimeBucket customTimeBucket)
++ {
++ Preconditions.checkNotNull(customTimeBucket);
++ this.customTimeBucket = customTimeBucket;
++ this.timeBucket = customTimeBucket.getTimeBucket();
++ }
++
++ /**
++ * Gets the {@link TimeBucket} for this {@link DimensionsDescriptor} object.
++ *
++ * @return The {@link TimeBucket} for this {@link DimensionsDescriptor} object.
++ * @deprecated use {@link #getCustomTimeBucket()} instead.
++ */
++ @Deprecated
++ public TimeBucket getTimeBucket()
++ {
++ return timeBucket;
++ }
++
++ /**
++ * Gets the {@link CustomTimeBucket} for this {@link DimensionsDescriptor} object.
++ *
++ * @return The {@link CustomTimeBucket} for this {@link DimensionsDescriptor} object.
++ */
++ public CustomTimeBucket getCustomTimeBucket()
++ {
++ return customTimeBucket;
++ }
++
++ /**
++ * This is a helper method which sets and validates the set of key fields for this
++ * {@link DimensionsDescriptor} object.
++ *
++ * @param fields The set of key fields for this {@link DimensionsDescriptor} object.
++ */
++ private void setFields(Fields fields)
++ {
++ Preconditions.checkNotNull(fields);
++ this.fields = fields;
++ }
++
++ /**
++ * Returns the set of key fields for this {@link DimensionsDescriptor} object.
++ *
++ * @return The set of key fields for this {@link DimensionsDescriptor} object.
++ */
++ public Fields getFields()
++ {
++ return fields;
++ }
++
++ /**
++ * This method is used to create a new {@link FieldsDescriptor} object representing this
++ * {@link DimensionsDescriptor} object from another {@link FieldsDescriptor} object which
++ * defines the names and types of all the available key fields.
++ *
++ * @param parentDescriptor The {@link FieldsDescriptor} object which defines the name and
++ * type of all the available key fields.
++ * @return A {@link FieldsDescriptor} object which represents this {@link DimensionsDescriptor} (dimensions
++ * combination)
++ * derived from the given {@link FieldsDescriptor} object.
++ */
++ public FieldsDescriptor createFieldsDescriptor(FieldsDescriptor parentDescriptor)
++ {
++ Map<String, Type> fieldToType = Maps.newHashMap();
++ Map<String, Type> parentFieldToType = parentDescriptor.getFieldToType();
++
++ for (String field : this.fields.getFields()) {
++ if (RESERVED_DIMENSION_NAMES.contains(field)) {
++ continue;
++ }
++
++ fieldToType.put(field, parentFieldToType.get(field));
++ }
++
++ if (timeBucket != null && timeBucket != TimeBucket.ALL) {
++ fieldToType.put(DIMENSION_TIME_BUCKET, DIMENSION_TIME_BUCKET_TYPE);
++ fieldToType.put(DIMENSION_TIME, Type.LONG);
++ }
++
++ return new FieldsDescriptor(fieldToType);
++ }
++
++ @Override
++ public int hashCode()
++ {
++ int hash = 7;
++ hash = 83 * hash + (this.customTimeBucket != null ? this.customTimeBucket.hashCode() : 0);
++ hash = 83 * hash + (this.fields != null ? this.fields.hashCode() : 0);
++ return hash;
++ }
++
++ @Override
++ public boolean equals(Object obj)
++ {
++ if (obj == null) {
++ return false;
++ }
++ if (getClass() != obj.getClass()) {
++ return false;
++ }
++ final DimensionsDescriptor other = (DimensionsDescriptor)obj;
++ if (!this.customTimeBucket.equals(other.customTimeBucket)) {
++ return false;
++ }
++ if (this.fields != other.fields && (this.fields == null || !this.fields.equals(other.fields))) {
++ return false;
++ }
++ return true;
++ }
++
++ @Override
++ public String toString()
++ {
++ return "DimensionsDescriptor{" + "timeBucket=" + customTimeBucket + ", fields=" + fields + '}';
++ }
++
++ @Override
++ public int compareTo(DimensionsDescriptor other)
++ {
++ if (this == other) {
++ return 0;
++ }
++
++ List<String> thisFieldList = this.getFields().getFieldsList();
++ List<String> otherFieldList = other.getFields().getFieldsList();
++
++ if (thisFieldList != otherFieldList) {
++ int compare = thisFieldList.size() - otherFieldList.size();
++
++ if (compare != 0) {
++ return compare;
++ }
++
++ Collections.sort(thisFieldList);
++ Collections.sort(otherFieldList);
++
++ for (int index = 0; index < thisFieldList.size(); index++) {
++ String thisField = thisFieldList.get(index);
++ String otherField = otherFieldList.get(index);
++
++ int fieldCompare = thisField.compareTo(otherField);
++
++ if (fieldCompare != 0) {
++ return fieldCompare;
++ }
++ }
++ }
++
++ CustomTimeBucket thisBucket = this.getCustomTimeBucket();
++ CustomTimeBucket otherBucket = other.getCustomTimeBucket();
++
++ if (thisBucket == null && otherBucket == null) {
++ return 0;
++ } else if (thisBucket != null && otherBucket == null) {
++ return 1;
++ } else if (thisBucket == null && otherBucket != null) {
++ return -1;
++ } else {
++ return thisBucket.compareTo(otherBucket);
++ }
++ }
++
++ private static final Logger LOG = LoggerFactory.getLogger(DimensionsDescriptor.class);
++}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a4718f02/library/src/test/java/com/datatorrent/lib/dimensions/CustomTimeBucketRegistryTest.java
----------------------------------------------------------------------
diff --cc library/src/test/java/com/datatorrent/lib/dimensions/CustomTimeBucketRegistryTest.java
index 0000000,0000000..5c4feed
new file mode 100644
--- /dev/null
+++ b/library/src/test/java/com/datatorrent/lib/dimensions/CustomTimeBucketRegistryTest.java
@@@ -1,0 -1,0 +1,87 @@@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++package com.datatorrent.lib.dimensions;
++
++import org.junit.Assert;
++import org.junit.Test;
++
++import com.datatorrent.lib.appdata.schemas.CustomTimeBucket;
++import com.datatorrent.lib.appdata.schemas.TimeBucket;
++import com.datatorrent.lib.dimensions.CustomTimeBucketRegistry;
++
++
++public class CustomTimeBucketRegistryTest
++{
++ @Test
++ public void testBuildingRegistry()
++ {
++ CustomTimeBucketRegistry timeBucketRegistry = new CustomTimeBucketRegistry();
++
++ CustomTimeBucket c1m = new CustomTimeBucket(TimeBucket.MINUTE);
++ CustomTimeBucket c1h = new CustomTimeBucket(TimeBucket.HOUR);
++ CustomTimeBucket c1d = new CustomTimeBucket(TimeBucket.DAY);
++
++ timeBucketRegistry.register(c1m, TimeBucket.MINUTE.ordinal());
++ timeBucketRegistry.register(c1h, TimeBucket.HOUR.ordinal());
++ timeBucketRegistry.register(c1d, TimeBucket.DAY.ordinal());
++
++ CustomTimeBucket customTimeBucket = timeBucketRegistry.getTimeBucket(TimeBucket.MINUTE.ordinal());
++ Assert.assertTrue(customTimeBucket.isUnit());
++ Assert.assertEquals(TimeBucket.MINUTE, customTimeBucket.getTimeBucket());
++
++ customTimeBucket = timeBucketRegistry.getTimeBucket(TimeBucket.HOUR.ordinal());
++ Assert.assertTrue(customTimeBucket.isUnit());
++ Assert.assertEquals(TimeBucket.HOUR, customTimeBucket.getTimeBucket());
++
++ customTimeBucket = timeBucketRegistry.getTimeBucket(TimeBucket.DAY.ordinal());
++ Assert.assertTrue(customTimeBucket.isUnit());
++ Assert.assertEquals(TimeBucket.DAY, customTimeBucket.getTimeBucket());
++
++ Assert.assertEquals(TimeBucket.MINUTE.ordinal(), (int)timeBucketRegistry.getTimeBucketId(c1m));
++ Assert.assertEquals(TimeBucket.HOUR.ordinal(), (int)timeBucketRegistry.getTimeBucketId(c1h));
++ Assert.assertEquals(TimeBucket.DAY.ordinal(), (int)timeBucketRegistry.getTimeBucketId(c1d));
++ }
++
++ @Test
++ public void testRegister()
++ {
++ CustomTimeBucketRegistry timeBucketRegistry = new CustomTimeBucketRegistry();
++
++ CustomTimeBucket c1m = new CustomTimeBucket(TimeBucket.MINUTE);
++ CustomTimeBucket c1h = new CustomTimeBucket(TimeBucket.HOUR);
++ CustomTimeBucket c1d = new CustomTimeBucket(TimeBucket.DAY);
++
++ timeBucketRegistry.register(c1m, TimeBucket.MINUTE.ordinal());
++ timeBucketRegistry.register(c1h, TimeBucket.HOUR.ordinal());
++ timeBucketRegistry.register(c1d, TimeBucket.DAY.ordinal());
++
++ int max = Integer.MIN_VALUE;
++ max = Math.max(max, TimeBucket.MINUTE.ordinal());
++ max = Math.max(max, TimeBucket.HOUR.ordinal());
++ max = Math.max(max, TimeBucket.DAY.ordinal());
++
++ CustomTimeBucket c5m = new CustomTimeBucket(TimeBucket.MINUTE, 5L);
++
++ timeBucketRegistry.register(c5m);
++ int timeBucketId = timeBucketRegistry.getTimeBucketId(c5m);
++
++ Assert.assertEquals(max + 1, timeBucketId);
++ }
++
++}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a4718f02/library/src/test/java/com/datatorrent/lib/dimensions/DimensionsDescriptorTest.java
----------------------------------------------------------------------
diff --cc library/src/test/java/com/datatorrent/lib/dimensions/DimensionsDescriptorTest.java
index 0000000,0000000..54682b1
new file mode 100644
--- /dev/null
+++ b/library/src/test/java/com/datatorrent/lib/dimensions/DimensionsDescriptorTest.java
@@@ -1,0 -1,0 +1,101 @@@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++package com.datatorrent.lib.dimensions;
++
++import java.util.Set;
++import java.util.concurrent.TimeUnit;
++
++import com.google.common.collect.Sets;
++
++import org.junit.Assert;
++import org.junit.Test;
++
++import com.datatorrent.lib.appdata.schemas.CustomTimeBucket;
++import com.datatorrent.lib.appdata.schemas.Fields;
++import com.datatorrent.lib.appdata.schemas.TimeBucket;
++import com.datatorrent.lib.appdata.schemas.Type;
++import com.datatorrent.lib.dimensions.DimensionsDescriptor;
++
++public class DimensionsDescriptorTest
++{
++ public static final String KEY_1_NAME = "key1";
++ public static final Type KEY_1_TYPE = Type.INTEGER;
++ public static final String KEY_2_NAME = "key2";
++ public static final Type KEY_2_TYPE = Type.STRING;
++
++ public static final String AGG_1_NAME = "agg1";
++ public static final Type AGG_1_TYPE = Type.INTEGER;
++ public static final String AGG_2_NAME = "agg2";
++ public static final Type AGG_2_TYPE = Type.STRING;
++
++ @Test
++ public void simpleTest1()
++ {
++ DimensionsDescriptor ad = new DimensionsDescriptor(KEY_1_NAME);
++
++ Set<String> fields = Sets.newHashSet();
++ fields.add(KEY_1_NAME);
++
++ Assert.assertEquals("The fields should match.", fields, ad.getFields().getFields());
++ Assert.assertEquals("The timeunit should be null.", null, ad.getTimeBucket());
++ }
++
++ @Test
++ public void simpleTest2()
++ {
++ DimensionsDescriptor ad = new DimensionsDescriptor(KEY_1_NAME +
++ DimensionsDescriptor.DELIMETER_SEPERATOR +
++ KEY_2_NAME);
++
++ Set<String> fields = Sets.newHashSet();
++ fields.add(KEY_1_NAME);
++ fields.add(KEY_2_NAME);
++
++ Assert.assertEquals("The fields should match.", fields, ad.getFields().getFields());
++ Assert.assertEquals("The timeunit should be null.", null, ad.getTimeBucket());
++ }
++
++ @Test
++ public void simpleTimeTest()
++ {
++ DimensionsDescriptor ad = new DimensionsDescriptor(KEY_1_NAME +
++ DimensionsDescriptor.DELIMETER_SEPERATOR +
++ DimensionsDescriptor.DIMENSION_TIME +
++ DimensionsDescriptor.DELIMETER_EQUALS +
++ "DAYS");
++
++ Set<String> fields = Sets.newHashSet();
++ fields.add(KEY_1_NAME);
++
++ Assert.assertEquals("The fields should match.", fields, ad.getFields().getFields());
++ Assert.assertEquals("The timeunit should be DAYS.", TimeUnit.DAYS, ad.getTimeBucket().getTimeUnit());
++ }
++
++ @Test
++ public void equalsAndHashCodeTest()
++ {
++ DimensionsDescriptor ddA = new DimensionsDescriptor(new CustomTimeBucket(TimeBucket.MINUTE, 5L),
++ new Fields(Sets.newHashSet("a", "b")));
++
++ DimensionsDescriptor ddB = new DimensionsDescriptor(new CustomTimeBucket(TimeBucket.MINUTE, 5L),
++ new Fields(Sets.newHashSet("a", "b")));
++
++ Assert.assertTrue(ddB.equals(ddA));
++ }
++}