You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/01/29 20:12:25 UTC

[08/50] [abbrv] incubator-apex-malhar git commit: MLHR-1944 #resolve #comment added dimensions helper classes

MLHR-1944 #resolve #comment added dimensions helper classes


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/a4718f02
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/a4718f02
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/a4718f02

Branch: refs/heads/master
Commit: a4718f02434736bf6d9e5f98d01e923e950503f5
Parents: c31c5ea 691ee6f
Author: Timothy Farkas <ti...@datatorrent.com>
Authored: Tue Dec 15 12:45:13 2015 -0800
Committer: brightchen <br...@datatorrent.com>
Committed: Tue Dec 15 16:47:53 2015 -0800

----------------------------------------------------------------------
 .../dimensions/CustomTimeBucketRegistry.java    | 138 ++++++
 .../dimensions/DimensionsConversionContext.java | 114 +++++
 .../lib/dimensions/DimensionsDescriptor.java    | 420 +++++++++++++++++++
 .../dimensions/aggregator/AggregateEvent.java   |  35 ++
 .../CustomTimeBucketRegistryTest.java           |  87 ++++
 .../dimensions/DimensionsDescriptorTest.java    | 101 +++++
 6 files changed, 895 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a4718f02/library/src/main/java/com/datatorrent/lib/dimensions/CustomTimeBucketRegistry.java
----------------------------------------------------------------------
diff --cc library/src/main/java/com/datatorrent/lib/dimensions/CustomTimeBucketRegistry.java
index 0000000,0000000..fc11647
new file mode 100644
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/dimensions/CustomTimeBucketRegistry.java
@@@ -1,0 -1,0 +1,138 @@@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *   http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied.  See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++package com.datatorrent.lib.dimensions;
++
++import java.io.Serializable;
++import java.util.HashMap;
++import java.util.Map;
++
++import com.google.common.base.Preconditions;
++
++import com.datatorrent.lib.appdata.schemas.CustomTimeBucket;
++
++import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
++import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
++import it.unimi.dsi.fastutil.objects.Object2IntMap;
++import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
++
++public class CustomTimeBucketRegistry implements Serializable
++{
++  private static final long serialVersionUID = 201509221536L;
++
++  private int currentId;
++
++  private Int2ObjectMap<CustomTimeBucket> idToTimeBucket = new Int2ObjectOpenHashMap<>();
++  private Object2IntMap<CustomTimeBucket> timeBucketToId = new Object2IntOpenHashMap<>();
++  private Map<String, CustomTimeBucket> textToTimeBucket = new HashMap<>();
++
++  public CustomTimeBucketRegistry()
++  {
++  }
++
++  public CustomTimeBucketRegistry(int startingId)
++  {
++    this.currentId = startingId;
++  }
++
++  public CustomTimeBucketRegistry(Int2ObjectMap<CustomTimeBucket> idToTimeBucket)
++  {
++    initialize(idToTimeBucket);
++  }
++
++  public CustomTimeBucketRegistry(Int2ObjectMap<CustomTimeBucket> idToTimeBucket,
++      int startingId)
++  {
++    int tempId = initialize(idToTimeBucket);
++
++    Preconditions.checkArgument(tempId < startingId, "The statingId " + startingId
++        + " must be larger than the largest ID " + tempId
++        + " in the given idToTimeBucket mapping");
++
++    this.idToTimeBucket = Preconditions.checkNotNull(idToTimeBucket);
++    this.currentId = startingId;
++  }
++
++  private int initialize(Int2ObjectMap<CustomTimeBucket> idToTimeBucket)
++  {
++    Preconditions.checkNotNull(idToTimeBucket);
++
++    int tempId = Integer.MIN_VALUE;
++
++    for (int timeBucketId : idToTimeBucket.keySet()) {
++      tempId = Math.max(tempId, timeBucketId);
++      CustomTimeBucket customTimeBucket = idToTimeBucket.get(timeBucketId);
++      textToTimeBucket.put(customTimeBucket.getText(), customTimeBucket);
++      Preconditions.checkNotNull(customTimeBucket);
++      timeBucketToId.put(customTimeBucket, timeBucketId);
++    }
++
++    return tempId;
++  }
++
++  public CustomTimeBucket getTimeBucket(int timeBucketId)
++  {
++    return idToTimeBucket.get(timeBucketId);
++  }
++
++  public Integer getTimeBucketId(CustomTimeBucket timeBucket)
++  {
++    if (!timeBucketToId.containsKey(timeBucket)) {
++      return null;
++    }
++
++    return timeBucketToId.get(timeBucket);
++  }
++
++  public CustomTimeBucket getTimeBucket(String text)
++  {
++    return textToTimeBucket.get(text);
++  }
++
++  public void register(CustomTimeBucket timeBucket)
++  {
++    register(timeBucket, currentId);
++  }
++
++  public void register(CustomTimeBucket timeBucket, int timeBucketId)
++  {
++    if (timeBucketToId.containsKey(timeBucket)) {
++      throw new IllegalArgumentException("The timeBucket " + timeBucket + " is already registered.");
++    }
++
++    if (timeBucketToId.containsValue(timeBucketId)) {
++      throw new IllegalArgumentException("The timeBucketId " + timeBucketId + " is already registered.");
++    }
++
++    idToTimeBucket.put(timeBucketId, timeBucket);
++    timeBucketToId.put(timeBucket, timeBucketId);
++
++    if (timeBucketId >= currentId) {
++      currentId = timeBucketId + 1;
++    }
++
++    textToTimeBucket.put(timeBucket.getText(), timeBucket);
++  }
++
++  @Override
++  public String toString()
++  {
++    return "CustomTimeBucketRegistry{" + "idToTimeBucket=" + idToTimeBucket + '}';
++  }
++
++}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a4718f02/library/src/main/java/com/datatorrent/lib/dimensions/DimensionsConversionContext.java
----------------------------------------------------------------------
diff --cc library/src/main/java/com/datatorrent/lib/dimensions/DimensionsConversionContext.java
index 0000000,0000000..9247320
new file mode 100644
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/dimensions/DimensionsConversionContext.java
@@@ -1,0 -1,0 +1,114 @@@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *   http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied.  See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++package com.datatorrent.lib.dimensions;
++
++import java.io.Serializable;
++
++import com.datatorrent.lib.appdata.gpo.GPOUtils.IndexSubset;
++import com.datatorrent.lib.appdata.schemas.FieldsDescriptor;
++
++/**
++ * This is a context object used to convert {@link InputEvent}s into aggregates
++ * in {@link IncrementalAggregator}s.
++ */
++public class DimensionsConversionContext implements Serializable
++{
++  private static final long serialVersionUID = 201506151157L;
++
++  public CustomTimeBucketRegistry customTimeBucketRegistry;
++  /**
++   * The schema ID for {@link Aggregate}s emitted by the
++   * {@link com.datatorrent.lib.dimensions.aggregator.IncrementalAggregator}s
++   * holding this context.
++   */
++  public int schemaID;
++  /**
++   * The dimensionsDescriptor ID for {@link Aggregate}s emitted by the
++   * {@link com.datatorrent.lib.dimensions.aggregator.IncrementalAggregator}s
++   * holding this context.
++   */
++  public int dimensionsDescriptorID;
++  /**
++   * The aggregator ID for {@link Aggregate}s emitted by the
++   * {@link com.datatorrent.lib.dimensions.aggregator.IncrementalAggregator}s
++   * holding this context.
++   */
++  public int aggregatorID;
++  /**
++   * The {@link DimensionsDescriptor} corresponding to the given dimension
++   * descriptor id.
++   */
++  public DimensionsDescriptor dd;
++  /**
++   * The {@link FieldsDescriptor} for the aggregate of the {@link Aggregate}s
++   * emitted by the
++   * {@link com.datatorrent.lib.dimensions.aggregator.IncrementalAggregator}s
++   * holding this context object.
++   */
++  public FieldsDescriptor aggregateDescriptor;
++  /**
++   * The {@link FieldsDescriptor} for the key of the {@link Aggregate}s emitted
++   * by the
++   * {@link com.datatorrent.lib.dimensions.aggregator.IncrementalAggregator}s
++   * holding this context object.
++   */
++  public FieldsDescriptor keyDescriptor;
++  /**
++   * The index of the timestamp field within the key of {@link InputEvent}s
++   * received by the
++   * {@link com.datatorrent.lib.dimensions.aggregator.IncrementalAggregator}s
++   * holding this context object. This is -1 if the {@link InputEvent} key has
++   * no timestamp.
++   */
++  public int inputTimestampIndex;
++  /**
++   * The index of the timestamp field within the key of {@link Aggregate}s
++   * emitted by the
++   * {@link com.datatorrent.lib.dimensions.aggregator.IncrementalAggregator}s
++   * holding this context object. This is -1 if the {@link Aggregate}'s key has
++   * no timestamp.
++   */
++  public int outputTimestampIndex;
++  /**
++   * The index of the time bucket field within the key of {@link Aggregate}s
++   * emitted by the
++   * {@link com.datatorrent.lib.dimensions.aggregator.IncrementalAggregator}s
++   * holding this context object. This is -1 if the {@link Aggregate}'s key has
++   * no timebucket.
++   */
++  public int outputTimebucketIndex;
++  /**
++   * The {@link IndexSubset} object that is used to extract key values from
++   * {@link InputEvent}s received by this aggregator.
++   */
++  public IndexSubset indexSubsetKeys;
++  /**
++   * The {@link IndexSubset} object that is used to extract aggregate values
++   * from {@link InputEvent}s received by this aggregator.
++   */
++  public IndexSubset indexSubsetAggregates;
++
++  /**
++   * Constructor for creating conversion context.
++   */
++  public DimensionsConversionContext()
++  {
++    //Do nothing.
++  }
++}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a4718f02/library/src/main/java/com/datatorrent/lib/dimensions/DimensionsDescriptor.java
----------------------------------------------------------------------
diff --cc library/src/main/java/com/datatorrent/lib/dimensions/DimensionsDescriptor.java
index 0000000,0000000..e593112
new file mode 100644
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/dimensions/DimensionsDescriptor.java
@@@ -1,0 -1,0 +1,420 @@@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *   http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied.  See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++package com.datatorrent.lib.dimensions;
++
++import java.io.Serializable;
++import java.util.Collections;
++import java.util.List;
++import java.util.Map;
++import java.util.Set;
++import java.util.concurrent.TimeUnit;
++
++import org.slf4j.Logger;
++import org.slf4j.LoggerFactory;
++
++import com.google.common.base.Preconditions;
++import com.google.common.collect.ImmutableSet;
++import com.google.common.collect.Maps;
++import com.google.common.collect.Sets;
++
++import com.datatorrent.lib.appdata.schemas.CustomTimeBucket;
++import com.datatorrent.lib.appdata.schemas.Fields;
++import com.datatorrent.lib.appdata.schemas.FieldsDescriptor;
++import com.datatorrent.lib.appdata.schemas.TimeBucket;
++import com.datatorrent.lib.appdata.schemas.Type;
++
++/**
++ * <p>
++ * This class defines a dimensions combination which is used by dimensions computation operators
++ * and stores. A dimension combination is composed of the names of the fields that constitute the key,
++ * as well as the TimeBucket under which data is stored.
++ * </p>
++ * <p>
++ * This class supports the creation of a dimensions combination from a {@link TimeBucket} object and a set of fields.
++ * It also supports the creation of a dimensions combination an aggregation string. An aggregation string looks like
++ * the following:
++ * <br/>
++ * <br/>
++ * {@code
++ * "time=MINUTES:publisher:advertiser"
++ * }
++ * <br/>
++ * <br/>
++ * In the example above <b>"time=MINUTES"</b> represents a time bucket, and the other colon separated strings represent
++ * the name of fields which comprise the key for this dimension combination. When specifiying a time bucket in an
++ * aggregation string you must use the name of one of the TimeUnit enums.
++ * </p>
++ * <p>
++ * One of the primary uses of a {@link DimensionsDescriptor} is for querying a dimensional data store. When a query is
++ * received for a dimensional data store, the query must be mapped to many things including a dimensionDescriptorID. The
++ * dimensionDescriptorID is an id assigned to a class of dimension combinations which share the same keys. This
++ * mapping is
++ * performed by creating a
++ * {@link DimensionsDescriptor} object from the query, and then using the {@link DimensionsDescriptor} object
++ * to look up the correct dimensionsDescriptorID. This lookup to retrieve a dimensionsDescriptorID is necessary
++ * because a
++ * dimensionsDescriptorID is used for storage in order to prevent key conflicts.
++ * </p>
++ *
++ */
++public class DimensionsDescriptor implements Serializable, Comparable<DimensionsDescriptor>
++{
++  private static final long serialVersionUID = 201506251237L;
++
++  /**
++   * Name of the reserved time field.
++   */
++  public static final String DIMENSION_TIME = "time";
++  /**
++   * Type of the reserved time field.
++   */
++  public static final Type DIMENSION_TIME_TYPE = Type.LONG;
++  /**
++   * Name of the reserved time bucket field.
++   */
++  public static final String DIMENSION_TIME_BUCKET = "timeBucket";
++  /**
++   * Type of the reserved time bucket field.
++   */
++  public static final Type DIMENSION_TIME_BUCKET_TYPE = Type.INTEGER;
++  /**
++   * The set of fields used for time, which are intended to be queried. Not that the
++   * timeBucket field is not included here because its not intended to be queried.
++   */
++  public static final Fields TIME_FIELDS = new Fields(Sets.newHashSet(DIMENSION_TIME));
++  /**
++   * This set represents the field names which cannot be part of the user defined field names in a schema for
++   * dimensions computation.
++   */
++  public static final Set<String> RESERVED_DIMENSION_NAMES = ImmutableSet.of(DIMENSION_TIME,
++      DIMENSION_TIME_BUCKET);
++  /**
++   * This is the equals string separator used when defining a time bucket for a dimensions combination.
++   */
++  public static final String DELIMETER_EQUALS = "=";
++  /**
++   * This separates dimensions in the dimensions combination.
++   */
++  public static final String DELIMETER_SEPERATOR = ":";
++  /**
++   * A map from a key field to its type.
++   */
++  public static final Map<String, Type> DIMENSION_FIELD_TO_TYPE;
++
++  /**
++   * The time bucket used for this dimension combination.
++   */
++  private TimeBucket timeBucket;
++  /**
++   * The custom time bucket used for this dimension combination.
++   */
++  private CustomTimeBucket customTimeBucket;
++  /**
++   * The set of key fields which compose this dimension combination.
++   */
++  private Fields fields;
++
++  static {
++    Map<String, Type> dimensionFieldToType = Maps.newHashMap();
++
++    dimensionFieldToType.put(DIMENSION_TIME, DIMENSION_TIME_TYPE);
++    dimensionFieldToType.put(DIMENSION_TIME_BUCKET, DIMENSION_TIME_BUCKET_TYPE);
++
++    DIMENSION_FIELD_TO_TYPE = Collections.unmodifiableMap(dimensionFieldToType);
++  }
++
++  /**
++   * Constructor for kryo serialization.
++   */
++  private DimensionsDescriptor()
++  {
++    //for kryo
++  }
++
++  /**
++   * Creates a dimensions descriptor (dimensions combination) with the given {@link TimeBucket} and key fields.
++   *
++   * @param timeBucket The {@link TimeBucket} that this dimensions combination represents.
++   * @param fields     The key fields included in this dimensions combination.
++   * @deprecated use
++   * {@link #DimensionsDescriptor(com.datatorrent.lib.appdata.schemas.CustomTimeBucket,
++   * com.datatorrent.lib.appdata.schemas.Fields)} instead.
++   */
++  @Deprecated
++  public DimensionsDescriptor(TimeBucket timeBucket,
++      Fields fields)
++  {
++    setTimeBucket(timeBucket);
++    setFields(fields);
++  }
++
++  /**
++   * Creates a dimensions descriptor (dimensions combination) with the given {@link CustomTimeBucket} and key fields.
++   *
++   * @param timeBucket The {@link CustomTimeBucket} that this dimensions combination represents.
++   * @param fields     The key fields included in this dimensions combination.
++   */
++  public DimensionsDescriptor(CustomTimeBucket timeBucket,
++      Fields fields)
++  {
++    setCustomTimeBucket(timeBucket);
++    setFields(fields);
++  }
++
++  /**
++   * Creates a dimensions descriptor (dimensions combination) with the given key fields.
++   *
++   * @param fields The key fields included in this dimensions combination.
++   */
++  public DimensionsDescriptor(Fields fields)
++  {
++    setFields(fields);
++  }
++
++  /**
++   * This construction creates a dimensions descriptor (dimensions combination) from the given aggregation string.
++   *
++   * @param aggregationString The aggregation string to use when initializing this dimensions combination.
++   */
++  public DimensionsDescriptor(String aggregationString)
++  {
++    initialize(aggregationString);
++  }
++
++  /**
++   * Initializes the dimensions combination with the given aggregation string.
++   *
++   * @param aggregationString The aggregation string with which to initialize this dimensions combination.
++   */
++  private void initialize(String aggregationString)
++  {
++    String[] fieldArray = aggregationString.split(DELIMETER_SEPERATOR);
++    Set<String> fieldSet = Sets.newHashSet();
++
++    for (String field : fieldArray) {
++      String[] fieldAndValue = field.split(DELIMETER_EQUALS);
++      String fieldName = fieldAndValue[0];
++
++      if (fieldName.equals(DIMENSION_TIME_BUCKET)) {
++        throw new IllegalArgumentException(DIMENSION_TIME_BUCKET + " is an invalid time.");
++      }
++
++      if (!fieldName.equals(DIMENSION_TIME)) {
++        fieldSet.add(fieldName);
++      }
++
++      if (fieldName.equals(DIMENSION_TIME)) {
++        if (timeBucket != null) {
++          throw new IllegalArgumentException("Cannot specify time in a dimensions "
++              + "descriptor when a timebucket is also "
++              + "specified.");
++        }
++
++        if (fieldAndValue.length == 2) {
++
++          timeBucket = TimeBucket.TIME_UNIT_TO_TIME_BUCKET.get(TimeUnit.valueOf(fieldAndValue[1]));
++        }
++      }
++    }
++
++    fields = new Fields(fieldSet);
++  }
++
++  /**
++   * This is a helper method which sets and validates the {@link TimeBucket}.
++   *
++   * @param timeBucket The {@link TimeBucket} to set and validate.
++   */
++  private void setTimeBucket(TimeBucket timeBucket)
++  {
++    Preconditions.checkNotNull(timeBucket);
++    this.timeBucket = timeBucket;
++    this.customTimeBucket = new CustomTimeBucket(timeBucket);
++  }
++
++  /**
++   * This is a helper method which sets and validates the {@link CustomTimeBucket}.
++   *
++   * @param customTimeBucket The {@link CustomTimeBucket} to set and validate.
++   */
++  private void setCustomTimeBucket(CustomTimeBucket customTimeBucket)
++  {
++    Preconditions.checkNotNull(customTimeBucket);
++    this.customTimeBucket = customTimeBucket;
++    this.timeBucket = customTimeBucket.getTimeBucket();
++  }
++
++  /**
++   * Gets the {@link TimeBucket} for this {@link DimensionsDescriptor} object.
++   *
++   * @return The {@link TimeBucket} for this {@link DimensionsDescriptor} object.
++   * @deprecated use {@link #getCustomTimeBucket()} instead.
++   */
++  @Deprecated
++  public TimeBucket getTimeBucket()
++  {
++    return timeBucket;
++  }
++
++  /**
++   * Gets the {@link CustomTimeBucket} for this {@link DimensionsDescriptor} object.
++   *
++   * @return The {@link CustomTimeBucket} for this {@link DimensionsDescriptor} object.
++   */
++  public CustomTimeBucket getCustomTimeBucket()
++  {
++    return customTimeBucket;
++  }
++
++  /**
++   * This is a helper method which sets and validates the set of key fields for this
++   * {@link DimensionsDescriptor} object.
++   *
++   * @param fields The set of key fields for this {@link DimensionsDescriptor} object.
++   */
++  private void setFields(Fields fields)
++  {
++    Preconditions.checkNotNull(fields);
++    this.fields = fields;
++  }
++
++  /**
++   * Returns the set of key fields for this {@link DimensionsDescriptor} object.
++   *
++   * @return The set of key fields for this {@link DimensionsDescriptor} object.
++   */
++  public Fields getFields()
++  {
++    return fields;
++  }
++
++  /**
++   * This method is used to create a new {@link FieldsDescriptor} object representing this
++   * {@link DimensionsDescriptor} object from another {@link FieldsDescriptor} object which
++   * defines the names and types of all the available key fields.
++   *
++   * @param parentDescriptor The {@link FieldsDescriptor} object which defines the name and
++   *                         type of all the available key fields.
++   * @return A {@link FieldsDescriptor} object which represents this {@link DimensionsDescriptor} (dimensions
++   * combination)
++   * derived from the given {@link FieldsDescriptor} object.
++   */
++  public FieldsDescriptor createFieldsDescriptor(FieldsDescriptor parentDescriptor)
++  {
++    Map<String, Type> fieldToType = Maps.newHashMap();
++    Map<String, Type> parentFieldToType = parentDescriptor.getFieldToType();
++
++    for (String field : this.fields.getFields()) {
++      if (RESERVED_DIMENSION_NAMES.contains(field)) {
++        continue;
++      }
++
++      fieldToType.put(field, parentFieldToType.get(field));
++    }
++
++    if (timeBucket != null && timeBucket != TimeBucket.ALL) {
++      fieldToType.put(DIMENSION_TIME_BUCKET, DIMENSION_TIME_BUCKET_TYPE);
++      fieldToType.put(DIMENSION_TIME, Type.LONG);
++    }
++
++    return new FieldsDescriptor(fieldToType);
++  }
++
++  @Override
++  public int hashCode()
++  {
++    int hash = 7;
++    hash = 83 * hash + (this.customTimeBucket != null ? this.customTimeBucket.hashCode() : 0);
++    hash = 83 * hash + (this.fields != null ? this.fields.hashCode() : 0);
++    return hash;
++  }
++
++  @Override
++  public boolean equals(Object obj)
++  {
++    if (obj == null) {
++      return false;
++    }
++    if (getClass() != obj.getClass()) {
++      return false;
++    }
++    final DimensionsDescriptor other = (DimensionsDescriptor)obj;
++    if (!this.customTimeBucket.equals(other.customTimeBucket)) {
++      return false;
++    }
++    if (this.fields != other.fields && (this.fields == null || !this.fields.equals(other.fields))) {
++      return false;
++    }
++    return true;
++  }
++
++  @Override
++  public String toString()
++  {
++    return "DimensionsDescriptor{" + "timeBucket=" + customTimeBucket + ", fields=" + fields + '}';
++  }
++
++  @Override
++  public int compareTo(DimensionsDescriptor other)
++  {
++    if (this == other) {
++      return 0;
++    }
++
++    List<String> thisFieldList = this.getFields().getFieldsList();
++    List<String> otherFieldList = other.getFields().getFieldsList();
++
++    if (thisFieldList != otherFieldList) {
++      int compare = thisFieldList.size() - otherFieldList.size();
++
++      if (compare != 0) {
++        return compare;
++      }
++
++      Collections.sort(thisFieldList);
++      Collections.sort(otherFieldList);
++
++      for (int index = 0; index < thisFieldList.size(); index++) {
++        String thisField = thisFieldList.get(index);
++        String otherField = otherFieldList.get(index);
++
++        int fieldCompare = thisField.compareTo(otherField);
++
++        if (fieldCompare != 0) {
++          return fieldCompare;
++        }
++      }
++    }
++
++    CustomTimeBucket thisBucket = this.getCustomTimeBucket();
++    CustomTimeBucket otherBucket = other.getCustomTimeBucket();
++
++    if (thisBucket == null && otherBucket == null) {
++      return 0;
++    } else if (thisBucket != null && otherBucket == null) {
++      return 1;
++    } else if (thisBucket == null && otherBucket != null) {
++      return -1;
++    } else {
++      return thisBucket.compareTo(otherBucket);
++    }
++  }
++
++  private static final Logger LOG = LoggerFactory.getLogger(DimensionsDescriptor.class);
++}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a4718f02/library/src/test/java/com/datatorrent/lib/dimensions/CustomTimeBucketRegistryTest.java
----------------------------------------------------------------------
diff --cc library/src/test/java/com/datatorrent/lib/dimensions/CustomTimeBucketRegistryTest.java
index 0000000,0000000..5c4feed
new file mode 100644
--- /dev/null
+++ b/library/src/test/java/com/datatorrent/lib/dimensions/CustomTimeBucketRegistryTest.java
@@@ -1,0 -1,0 +1,87 @@@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *   http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied.  See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++package com.datatorrent.lib.dimensions;
++
++import org.junit.Assert;
++import org.junit.Test;
++
++import com.datatorrent.lib.appdata.schemas.CustomTimeBucket;
++import com.datatorrent.lib.appdata.schemas.TimeBucket;
++import com.datatorrent.lib.dimensions.CustomTimeBucketRegistry;
++
++
++public class CustomTimeBucketRegistryTest
++{
++  @Test
++  public void testBuildingRegistry()
++  {
++    CustomTimeBucketRegistry timeBucketRegistry = new CustomTimeBucketRegistry();
++
++    CustomTimeBucket c1m = new CustomTimeBucket(TimeBucket.MINUTE);
++    CustomTimeBucket c1h = new CustomTimeBucket(TimeBucket.HOUR);
++    CustomTimeBucket c1d = new CustomTimeBucket(TimeBucket.DAY);
++
++    timeBucketRegistry.register(c1m, TimeBucket.MINUTE.ordinal());
++    timeBucketRegistry.register(c1h, TimeBucket.HOUR.ordinal());
++    timeBucketRegistry.register(c1d, TimeBucket.DAY.ordinal());
++
++    CustomTimeBucket customTimeBucket = timeBucketRegistry.getTimeBucket(TimeBucket.MINUTE.ordinal());
++    Assert.assertTrue(customTimeBucket.isUnit());
++    Assert.assertEquals(TimeBucket.MINUTE, customTimeBucket.getTimeBucket());
++
++    customTimeBucket = timeBucketRegistry.getTimeBucket(TimeBucket.HOUR.ordinal());
++    Assert.assertTrue(customTimeBucket.isUnit());
++    Assert.assertEquals(TimeBucket.HOUR, customTimeBucket.getTimeBucket());
++
++    customTimeBucket = timeBucketRegistry.getTimeBucket(TimeBucket.DAY.ordinal());
++    Assert.assertTrue(customTimeBucket.isUnit());
++    Assert.assertEquals(TimeBucket.DAY, customTimeBucket.getTimeBucket());
++
++    Assert.assertEquals(TimeBucket.MINUTE.ordinal(), (int)timeBucketRegistry.getTimeBucketId(c1m));
++    Assert.assertEquals(TimeBucket.HOUR.ordinal(), (int)timeBucketRegistry.getTimeBucketId(c1h));
++    Assert.assertEquals(TimeBucket.DAY.ordinal(), (int)timeBucketRegistry.getTimeBucketId(c1d));
++  }
++
++  @Test
++  public void testRegister()
++  {
++    CustomTimeBucketRegistry timeBucketRegistry = new CustomTimeBucketRegistry();
++
++    CustomTimeBucket c1m = new CustomTimeBucket(TimeBucket.MINUTE);
++    CustomTimeBucket c1h = new CustomTimeBucket(TimeBucket.HOUR);
++    CustomTimeBucket c1d = new CustomTimeBucket(TimeBucket.DAY);
++
++    timeBucketRegistry.register(c1m, TimeBucket.MINUTE.ordinal());
++    timeBucketRegistry.register(c1h, TimeBucket.HOUR.ordinal());
++    timeBucketRegistry.register(c1d, TimeBucket.DAY.ordinal());
++
++    int max = Integer.MIN_VALUE;
++    max = Math.max(max, TimeBucket.MINUTE.ordinal());
++    max = Math.max(max, TimeBucket.HOUR.ordinal());
++    max = Math.max(max, TimeBucket.DAY.ordinal());
++
++    CustomTimeBucket c5m = new CustomTimeBucket(TimeBucket.MINUTE, 5L);
++
++    timeBucketRegistry.register(c5m);
++    int timeBucketId = timeBucketRegistry.getTimeBucketId(c5m);
++
++    Assert.assertEquals(max + 1, timeBucketId);
++  }
++
++}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a4718f02/library/src/test/java/com/datatorrent/lib/dimensions/DimensionsDescriptorTest.java
----------------------------------------------------------------------
diff --cc library/src/test/java/com/datatorrent/lib/dimensions/DimensionsDescriptorTest.java
index 0000000,0000000..54682b1
new file mode 100644
--- /dev/null
+++ b/library/src/test/java/com/datatorrent/lib/dimensions/DimensionsDescriptorTest.java
@@@ -1,0 -1,0 +1,101 @@@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *   http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied.  See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++package com.datatorrent.lib.dimensions;
++
++import java.util.Set;
++import java.util.concurrent.TimeUnit;
++
++import com.google.common.collect.Sets;
++
++import org.junit.Assert;
++import org.junit.Test;
++
++import com.datatorrent.lib.appdata.schemas.CustomTimeBucket;
++import com.datatorrent.lib.appdata.schemas.Fields;
++import com.datatorrent.lib.appdata.schemas.TimeBucket;
++import com.datatorrent.lib.appdata.schemas.Type;
++import com.datatorrent.lib.dimensions.DimensionsDescriptor;
++
++public class DimensionsDescriptorTest
++{
++  public static final String KEY_1_NAME = "key1";
++  public static final Type KEY_1_TYPE = Type.INTEGER;
++  public static final String KEY_2_NAME = "key2";
++  public static final Type KEY_2_TYPE = Type.STRING;
++
++  public static final String AGG_1_NAME = "agg1";
++  public static final Type AGG_1_TYPE = Type.INTEGER;
++  public static final String AGG_2_NAME = "agg2";
++  public static final Type AGG_2_TYPE = Type.STRING;
++
++  @Test
++  public void simpleTest1()
++  {
++    DimensionsDescriptor ad = new DimensionsDescriptor(KEY_1_NAME);
++
++    Set<String> fields = Sets.newHashSet();
++    fields.add(KEY_1_NAME);
++
++    Assert.assertEquals("The fields should match.", fields, ad.getFields().getFields());
++    Assert.assertEquals("The timeunit should be null.", null, ad.getTimeBucket());
++  }
++
++  @Test
++  public void simpleTest2()
++  {
++    DimensionsDescriptor ad = new DimensionsDescriptor(KEY_1_NAME +
++                                                       DimensionsDescriptor.DELIMETER_SEPERATOR +
++                                                       KEY_2_NAME);
++
++    Set<String> fields = Sets.newHashSet();
++    fields.add(KEY_1_NAME);
++    fields.add(KEY_2_NAME);
++
++    Assert.assertEquals("The fields should match.", fields, ad.getFields().getFields());
++    Assert.assertEquals("The timeunit should be null.", null, ad.getTimeBucket());
++  }
++
++  @Test
++  public void simpleTimeTest()
++  {
++    DimensionsDescriptor ad = new DimensionsDescriptor(KEY_1_NAME +
++                                                       DimensionsDescriptor.DELIMETER_SEPERATOR +
++                                                       DimensionsDescriptor.DIMENSION_TIME +
++                                                       DimensionsDescriptor.DELIMETER_EQUALS +
++                                                       "DAYS");
++
++    Set<String> fields = Sets.newHashSet();
++    fields.add(KEY_1_NAME);
++
++    Assert.assertEquals("The fields should match.", fields, ad.getFields().getFields());
++    Assert.assertEquals("The timeunit should be DAYS.", TimeUnit.DAYS, ad.getTimeBucket().getTimeUnit());
++  }
++
++  @Test
++  public void equalsAndHashCodeTest()
++  {
++    DimensionsDescriptor ddA = new DimensionsDescriptor(new CustomTimeBucket(TimeBucket.MINUTE, 5L),
++                                                        new Fields(Sets.newHashSet("a", "b")));
++
++    DimensionsDescriptor ddB = new DimensionsDescriptor(new CustomTimeBucket(TimeBucket.MINUTE, 5L),
++                                                        new Fields(Sets.newHashSet("a", "b")));
++
++    Assert.assertTrue(ddB.equals(ddA));
++  }
++}