You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ch...@apache.org on 2016/08/24 08:01:44 UTC

[1/2] apex-malhar git commit: APEXMALHAR-2100 Implementation of Inner Join operator

Repository: apex-malhar
Updated Branches:
  refs/heads/master e44caa5a5 -> 822323d02


APEXMALHAR-2100 Implementation of Inner Join operator


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/fe2da3e9
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/fe2da3e9
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/fe2da3e9

Branch: refs/heads/master
Commit: fe2da3e96ade8eaef8bce38237b8147adfffa494
Parents: 255bc11
Author: chaitanya <ch...@apache.org>
Authored: Tue Aug 23 14:02:52 2016 +0530
Committer: chaitanya <ch...@apache.org>
Committed: Tue Aug 23 14:02:52 2016 +0530

----------------------------------------------------------------------
 .../lib/join/AbstractInnerJoinOperator.java     | 340 ++++++++++++++++++
 .../AbstractManagedStateInnerJoinOperator.java  | 253 +++++++++++++
 .../apex/malhar/lib/join/JoinStreamCodec.java   |  46 +++
 .../malhar/lib/join/POJOInnerJoinOperator.java  | 246 +++++++++++++
 .../state/managed/AbstractManagedStateImpl.java |   4 +-
 .../managed/ManagedTimeStateMultiValue.java     | 353 +++++++++++++++++++
 .../lib/join/POJOInnerJoinOperatorTest.java     | 351 ++++++++++++++++++
 .../lib/join/POJOPartitionJoinOperatorTest.java | 194 ++++++++++
 8 files changed, 1786 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/fe2da3e9/library/src/main/java/org/apache/apex/malhar/lib/join/AbstractInnerJoinOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/join/AbstractInnerJoinOperator.java b/library/src/main/java/org/apache/apex/malhar/lib/join/AbstractInnerJoinOperator.java
new file mode 100644
index 0000000..dd58ea2
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/join/AbstractInnerJoinOperator.java
@@ -0,0 +1,340 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.join;
+
+import java.util.Arrays;
+import java.util.List;
+
+import javax.validation.constraints.NotNull;
+
+import org.apache.apex.malhar.lib.state.spillable.Spillable;
+import org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponent;
+import org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponentImpl;
+import org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableStateStore;
+
+import com.google.common.base.Preconditions;
+
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.annotation.OperatorAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+
+/**
+ * <p>
+ * An abstract implementation of inner join operator. Operator receives tuples from two streams,
+ * applies the join operation based on constraint and emit the joined value.
+ * Concrete classes should provide implementation to extractKey, extractTime, mergeTuples methods.
+ *
+ * <b>Properties:</b><br>
+ * <b>includeFieldStr</b>: List of comma separated fields to be added to the output tuple.
+ *                         Ex: Field1,Field2;Field3,Field4<br>
+ * <b>leftKeyExpression</b>: key field expression for stream1.<br>
+ * <b>rightKeyExpression</b>: key field expression for stream2.<br>
+ * <b>timeFields</b>: List of comma separated time field for both the streams. Ex: Field1,Field2<br>
+ * <b>expiryTime</b>: Expiry time in milliseconds for stored tuples which comes from both streams<br>
+ * <b>isLeftKeyPrimary</b>: : Specifies whether the left key(Stream1 key) is primary or not<br>
+ * <b>isRightKeyPrimary</b>: : Specifies whether the right key(stream2 key) is primary or not<br>
+ *
+ * <b> Example: </b> <br>
+ *  Left input port receives customer details and right input port receives Order details.
+ *  Schema for the Customer be in the form of {ID, Name, CTime}
+ *  Schema for the Order be in the form of {OID, CID, OTime}
+ *  Now, Join the tuples of Customer and Order streams where Customer.ID = Order.CID and the constraint is
+ *  matched tuples must have timestamp within 5 minutes.
+ *  Here, leftKeyExpression = ID, rightKeyExpression = CID and Time Fields = CTime,
+ *  OTime, expiryTime = 5 minutes </b> <br>
+ *
+ *  @displayName Abstract Inner Join Operator
+ *  @tags join
+ */
+@org.apache.hadoop.classification.InterfaceStability.Evolving
+@OperatorAnnotation(checkpointableWithinAppWindow = false)
+public abstract class AbstractInnerJoinOperator<K,T> extends BaseOperator
+{
+  @NotNull
+  private String leftKeyExpression;
+  @NotNull
+  private String rightKeyExpression;
+  protected transient String[][] includeFields;
+  protected transient List<String> keyFieldExpressions;
+  protected transient List<String> timeFields;
+  @AutoMetric
+  private long tuplesJoinedPerSec;
+  private double windowTimeSec;
+  private int tuplesCount;
+  @NotNull
+  private String includeFieldStr;
+  private String timeFieldsStr;
+  @NotNull
+  private Long expiryTime;
+  private boolean isLeftKeyPrimary = false;
+  private boolean isRightKeyPrimary = false;
+  protected SpillableComplexComponent component;
+  protected Spillable.SpillableByteArrayListMultimap<K,T> stream1Data;
+  protected Spillable.SpillableByteArrayListMultimap<K,T> stream2Data;
+
+  /**
+   * Process the tuple which are received from input ports with the following steps:
+   * 1) Extract key from the given tuple
+   * 2) Insert <key,tuple> into the store where store is the stream1Data if the tuple
+   * receives from stream1 or viceversa.
+   * 3) Get the values of the key if found it in opposite store
+   * 4) Merge the given tuple and values found from step (3)
+   * @param tuple given tuple
+   * @param isStream1Data Specifies whether the given tuple belongs to stream1 or not.
+   */
+  protected void processTuple(T tuple, boolean isStream1Data)
+  {
+    Spillable.SpillableByteArrayListMultimap<K,T> store = isStream1Data ? stream1Data : stream2Data;
+    K key = extractKey(tuple,isStream1Data);
+    if (!store.put(key, tuple)) {
+      return;
+    }
+    Spillable.SpillableByteArrayListMultimap<K, T> valuestore = isStream1Data ? stream2Data : stream1Data;
+    joinStream(tuple,isStream1Data, valuestore.get(key));
+  }
+
+  /**
+   * Merge the given tuple and list of values.
+   * @param tuple given tuple
+   * @param isStream1Data Specifies whether the given tuple belongs to stream1 or not.
+   * @param value list of tuples
+   */
+  protected void joinStream(T tuple, boolean isStream1Data, List<T> value)
+  {
+    // Join the input tuple with the joined tuples
+    if (value != null) {
+      for (T joinedValue : value) {
+        T result = isStream1Data ? mergeTuples(tuple, joinedValue) :
+            mergeTuples(joinedValue, tuple);
+        if (result != null) {
+          tuplesCount++;
+          emitTuple(result);
+        }
+      }
+    }
+  }
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+    component = new SpillableComplexComponentImpl(new InMemSpillableStateStore());
+    if (stream1Data == null && stream2Data == null) {
+      createStores();
+    }
+    component.setup(context);
+    keyFieldExpressions = Arrays.asList(leftKeyExpression,rightKeyExpression);
+    if (timeFields != null) {
+      timeFields = Arrays.asList(timeFieldsStr.split(","));
+    }
+    String[] streamFields = includeFieldStr.split(";");
+    includeFields = new String[2][];
+    for (int i = 0; i < streamFields.length; i++) {
+      includeFields[i] = streamFields[i].split(",");
+    }
+    windowTimeSec = (context.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT) *
+      context.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS) * 1.0) / 1000.0;
+  }
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+    component.beginWindow(windowId);
+    tuplesJoinedPerSec = 0;
+    tuplesCount = 0;
+  }
+
+  @Override
+  public void endWindow()
+  {
+    component.endWindow();
+    tuplesJoinedPerSec = (long)(tuplesCount / windowTimeSec);
+  }
+
+  @Override
+  public void teardown()
+  {
+    component.teardown();
+  }
+
+  /**
+   * Extract the key from the given tuple
+   * @param tuple given tuple
+   * @param isStream1Data Specifies whether the given tuple belongs to stream1 or not.
+   * @return the key
+   */
+  public abstract K extractKey(T tuple, boolean isStream1Data);
+
+  /**
+   * Extract the time from the given tuple
+   * @param tuple given tuple
+   * @param isStream1Data Specifies whether the given tuple belongs to stream1 or not.
+   * @return the time
+   */
+  public abstract long extractTime(T tuple, boolean isStream1Data);
+
+  /**
+   * Merge the given tuples
+   * @param tuple1 tuple belongs to stream1
+   * @param tuple2 tuple belongs to stream1
+   * @return the merge tuple
+   */
+  public abstract T mergeTuples(T tuple1, T tuple2);
+
+  /**
+   * Emit the given tuple
+   * @param tuple given tuple
+   */
+  public abstract void emitTuple(T tuple);
+
+  /**
+   * Create stores for both the streams
+   */
+  public void createStores()
+  {
+    stream1Data = component.newSpillableByteArrayListMultimap(0,null,null);
+    stream2Data = component.newSpillableByteArrayListMultimap(0,null,null);
+  }
+
+  /**
+   * Get the left key expression
+   * @return the leftKeyExpression
+   */
+  public String getLeftKeyExpression()
+  {
+    return leftKeyExpression;
+  }
+
+  /**
+   * Set the left key expression
+   * @param leftKeyExpression given leftKeyExpression
+   */
+  public void setLeftKeyExpression(String leftKeyExpression)
+  {
+    this.leftKeyExpression = leftKeyExpression;
+  }
+
+  /**
+   * Get the right key expression
+   * @return the rightKeyExpression
+   */
+  public String getRightKeyExpression()
+  {
+    return rightKeyExpression;
+  }
+
+  /**
+   * Set the right key expression
+   * @param rightKeyExpression given rightKeyExpression
+   */
+  public void setRightKeyExpression(String rightKeyExpression)
+  {
+    this.rightKeyExpression = rightKeyExpression;
+  }
+
+  /**
+   * Return the include fields of two streams
+   * @return the includeFieldStr
+   */
+  public String getIncludeFieldStr()
+  {
+    return includeFieldStr;
+  }
+
+  /**
+   * List of comma separated fields to be added to the output tuple.
+   * @param includeFieldStr given includeFieldStr
+   */
+  public void setIncludeFieldStr(@NotNull String includeFieldStr)
+  {
+    this.includeFieldStr = Preconditions.checkNotNull(includeFieldStr);
+  }
+
+  /**
+   * Return the time fields for both the streams
+   * @return the timeFieldsStr
+   */
+  public String getTimeFieldsStr()
+  {
+    return timeFieldsStr;
+  }
+
+  /**
+   * Set the time fields as comma separated for both the streams
+   * @param timeFieldsStr given timeFieldsStr
+   */
+  public void setTimeFieldsStr(String timeFieldsStr)
+  {
+    this.timeFieldsStr = timeFieldsStr;
+  }
+
+  /**
+   * returns the expiry time
+   * @return the expiryTime
+   */
+  public Long getExpiryTime()
+  {
+    return expiryTime;
+  }
+
+  /**
+   * Sets the expiry time
+   * @return the expiryTime
+   */
+  public void setExpiryTime(@NotNull Long expiryTime)
+  {
+    this.expiryTime = Preconditions.checkNotNull(expiryTime);
+  }
+
+  /**
+   * return whether the left key is primary or not
+   * @return the isLeftKeyPrimary
+   */
+  public boolean isLeftKeyPrimary()
+  {
+    return isLeftKeyPrimary;
+  }
+
+  /**
+   * Set the leftKeyPrimary
+   * @param leftKeyPrimary given leftKeyPrimary
+   */
+  public void setLeftKeyPrimary(boolean leftKeyPrimary)
+  {
+    isLeftKeyPrimary = leftKeyPrimary;
+  }
+
+  /**
+   * return whether the right key is primary or not
+   * @return the isRightKeyPrimary
+   */
+  public boolean isRightKeyPrimary()
+  {
+    return isRightKeyPrimary;
+  }
+
+  /**
+   * Set the rightKeyPrimary
+   * @param rightKeyPrimary given rightKeyPrimary
+   */
+  public void setRightKeyPrimary(boolean rightKeyPrimary)
+  {
+    isRightKeyPrimary = rightKeyPrimary;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/fe2da3e9/library/src/main/java/org/apache/apex/malhar/lib/join/AbstractManagedStateInnerJoinOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/join/AbstractManagedStateInnerJoinOperator.java b/library/src/main/java/org/apache/apex/malhar/lib/join/AbstractManagedStateInnerJoinOperator.java
new file mode 100644
index 0000000..dbf903d
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/join/AbstractManagedStateInnerJoinOperator.java
@@ -0,0 +1,253 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.join;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.joda.time.Duration;
+import org.apache.apex.malhar.lib.state.managed.ManagedTimeStateImpl;
+import org.apache.apex.malhar.lib.state.managed.ManagedTimeStateMultiValue;
+import org.apache.apex.malhar.lib.state.spillable.Spillable;
+import org.apache.hadoop.fs.Path;
+import com.google.common.collect.Maps;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.Operator;
+import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
+
+/**
+ * An abstract implementation of inner join operator over Managed state which extends from
+ * AbstractInnerJoinOperator.
+ *
+ * <b>Properties:</b><br>
+ * <b>noOfBuckets</b>: Number of buckets required for Managed state. <br>
+ * <b>bucketSpanTime</b>: Indicates the length of the time bucket. <br>
+ */
+@org.apache.hadoop.classification.InterfaceStability.Evolving
+public abstract class AbstractManagedStateInnerJoinOperator<K,T> extends AbstractInnerJoinOperator<K,T> implements
+    Operator.CheckpointNotificationListener, Operator.IdleTimeHandler
+{
+  public static final String stateDir = "managedState";
+  public static final String stream1State = "stream1Data";
+  public static final String stream2State = "stream2Data";
+  private transient Map<JoinEvent<K,T>, Future<List>> waitingEvents = Maps.newLinkedHashMap();
+  private int noOfBuckets = 1;
+  private Long bucketSpanTime;
+  protected ManagedTimeStateImpl stream1Store;
+  protected ManagedTimeStateImpl stream2Store;
+
+  /**
+   * Create Managed states and stores for both the streams.
+   */
+  @Override
+  public void createStores()
+  {
+    stream1Store = new ManagedTimeStateImpl();
+    stream2Store = new ManagedTimeStateImpl();
+    stream1Store.setNumBuckets(noOfBuckets);
+    stream2Store.setNumBuckets(noOfBuckets);
+    if (bucketSpanTime != null) {
+      stream1Store.getTimeBucketAssigner().setBucketSpan(Duration.millis(bucketSpanTime));
+      stream2Store.getTimeBucketAssigner().setBucketSpan(Duration.millis(bucketSpanTime));
+    }
+    stream1Store.getTimeBucketAssigner().setExpireBefore(Duration.millis(getExpiryTime()));
+    stream2Store.getTimeBucketAssigner().setExpireBefore(Duration.millis(getExpiryTime()));
+
+    stream1Data = new ManagedTimeStateMultiValue(stream1Store, !isLeftKeyPrimary());
+    stream2Data = new ManagedTimeStateMultiValue(stream2Store, !isRightKeyPrimary());
+  }
+
+  /**
+   * Process the tuple which are received from input ports with the following steps:
+   * 1) Extract key from the given tuple
+   * 2) Insert <key,tuple> into the store where store is the stream1Data if the tuple
+   * receives from stream1 or viceversa.
+   * 3) Get the values of the key in asynchronous if found it in opposite store
+   * 4) If the future is done then Merge the given tuple and values found from step (3) otherwise
+   *    put it in waitingEvents
+   * @param tuple given tuple
+   * @param isStream1Data Specifies whether the given tuple belongs to stream1 or not.
+   */
+  @Override
+  protected void processTuple(T tuple, boolean isStream1Data)
+  {
+    Spillable.SpillableByteArrayListMultimap<K,T> store = isStream1Data ? stream1Data : stream2Data;
+    K key = extractKey(tuple,isStream1Data);
+    long timeBucket = extractTime(tuple,isStream1Data);
+    if (!((ManagedTimeStateMultiValue)store).put(key, tuple,timeBucket)) {
+      return;
+    }
+    Spillable.SpillableByteArrayListMultimap<K, T> valuestore = isStream1Data ? stream2Data : stream1Data;
+    Future<List> future = ((ManagedTimeStateMultiValue)valuestore).getAsync(key);
+    if (future.isDone()) {
+      try {
+        joinStream(tuple,isStream1Data, future.get());
+      } catch (InterruptedException | ExecutionException e) {
+        throw new RuntimeException(e);
+      }
+    } else {
+      waitingEvents.put(new JoinEvent<>(key,tuple,isStream1Data),future);
+    }
+  }
+
+  @Override
+  public void handleIdleTime()
+  {
+    if (waitingEvents.size() > 0) {
+      processWaitEvents(false);
+    }
+  }
+
+  @Override
+  public void beforeCheckpoint(long l)
+  {
+    stream1Store.beforeCheckpoint(l);
+    stream2Store.beforeCheckpoint(l);
+  }
+
+  @Override
+  public void checkpointed(long l)
+  {
+    stream1Store.checkpointed(l);
+    stream2Store.checkpointed(l);
+  }
+
+  @Override
+  public void committed(long l)
+  {
+    stream1Store.committed(l);
+    stream2Store.committed(l);
+  }
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+    super.setup(context);
+    ((FileAccessFSImpl)stream1Store.getFileAccess()).setBasePath(context.getValue(DAG.APPLICATION_PATH) + Path.SEPARATOR + stateDir + Path.SEPARATOR + String.valueOf(context.getId()) + Path.SEPARATOR + stream1State);
+    ((FileAccessFSImpl)stream2Store.getFileAccess()).setBasePath(context.getValue(DAG.APPLICATION_PATH) + Path.SEPARATOR + stateDir + Path.SEPARATOR + String.valueOf(context.getId()) + Path.SEPARATOR + stream2State);
+    stream1Store.getCheckpointManager().setRecoveryPath("managed_state_" + stream1State);
+    stream1Store.getCheckpointManager().setRecoveryPath("managed_state_" + stream2State);
+    stream1Store.setup(context);
+    stream2Store.setup(context);
+  }
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+    stream1Store.beginWindow(windowId);
+    stream2Store.beginWindow(windowId);
+    super.beginWindow(windowId);
+  }
+
+  /**
+   * Process the waiting events
+   * @param finalize finalize Whether or not to wait for future to return
+   */
+  private void processWaitEvents(boolean finalize)
+  {
+    Iterator<Map.Entry<JoinEvent<K,T>, Future<List>>> waitIterator = waitingEvents.entrySet().iterator();
+    while (waitIterator.hasNext()) {
+      Map.Entry<JoinEvent<K,T>, Future<List>> waitingEvent = waitIterator.next();
+      Future<List> future = waitingEvent.getValue();
+      if (future.isDone() || finalize) {
+        try {
+          JoinEvent<K,T> event = waitingEvent.getKey();
+          joinStream(event.value,event.isStream1Data,future.get());
+        } catch (InterruptedException | ExecutionException e) {
+          throw new RuntimeException("end window", e);
+        }
+        waitIterator.remove();
+        if (!finalize) {
+          break;
+        }
+      }
+    }
+  }
+
+  @Override
+  public void endWindow()
+  {
+    processWaitEvents(true);
+    stream1Store.endWindow();
+    stream2Store.endWindow();
+    super.endWindow();
+  }
+
+  @Override
+  public void teardown()
+  {
+    stream1Store.teardown();
+    stream2Store.teardown();
+    super.teardown();
+  }
+
+  /**
+   * Return the number of buckets
+   * @return the noOfBuckets
+   */
+  public int getNoOfBuckets()
+  {
+    return noOfBuckets;
+  }
+
+  /**
+   * Set the number of buckets required for managed state
+   * @param noOfBuckets noOfBuckets
+   */
+  public void setNoOfBuckets(int noOfBuckets)
+  {
+    this.noOfBuckets = noOfBuckets;
+  }
+
+  /**
+   * Return the bucketSpanTime
+   * @return the bucketSpanTime
+   */
+  public Long getBucketSpanTime()
+  {
+    return bucketSpanTime;
+  }
+
+  /**
+   * Sets the length of the time bucket required for managed state.
+   * @param bucketSpanTime given bucketSpanTime
+   */
+  public void setBucketSpanTime(Long bucketSpanTime)
+  {
+    this.bucketSpanTime = bucketSpanTime;
+  }
+
+  public static class JoinEvent<K,T>
+  {
+    public K key;
+    public T value;
+    public boolean isStream1Data;
+
+    public JoinEvent(K key, T value, boolean isStream1Data)
+    {
+      this.key = key;
+      this.value = value;
+      this.isStream1Data = isStream1Data;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/fe2da3e9/library/src/main/java/org/apache/apex/malhar/lib/join/JoinStreamCodec.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/join/JoinStreamCodec.java b/library/src/main/java/org/apache/apex/malhar/lib/join/JoinStreamCodec.java
new file mode 100644
index 0000000..7a34699
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/join/JoinStreamCodec.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.join;
+
+import com.datatorrent.lib.codec.KryoSerializableStreamCodec;
+import com.datatorrent.lib.util.PojoUtils;
+
+/**
+ * Stream codec based on keyExpression for POJO Inner Join Operator.
+ */
+@org.apache.hadoop.classification.InterfaceStability.Evolving
+public class JoinStreamCodec extends KryoSerializableStreamCodec<Object>
+{
+  private transient PojoUtils.Getter<Object, Object> getter;
+  private String keyExpression;
+
+  public JoinStreamCodec(String keyExpression)
+  {
+    this.keyExpression = keyExpression;
+  }
+
+  @Override
+  public int getPartition(Object o)
+  {
+    if (getter == null) {
+      getter = PojoUtils.createGetter(o.getClass(), keyExpression, Object.class);
+    }
+    return getter.get(o).hashCode();
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/fe2da3e9/library/src/main/java/org/apache/apex/malhar/lib/join/POJOInnerJoinOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/join/POJOInnerJoinOperator.java b/library/src/main/java/org/apache/apex/malhar/lib/join/POJOInnerJoinOperator.java
new file mode 100644
index 0000000..0b23808
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/join/POJOInnerJoinOperator.java
@@ -0,0 +1,246 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.join;
+
+import java.lang.reflect.Array;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.lang3.ClassUtils;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.StreamCodec;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.lib.util.PojoUtils;
+
+/**
+ * Concrete implementation of AbstractManagedStateInnerJoinOperator and receives objects from both streams.
+ *
+ * @displayName POJO Inner Join Operator
+ * @tags join
+ */
+@org.apache.hadoop.classification.InterfaceStability.Evolving
+public class POJOInnerJoinOperator extends AbstractManagedStateInnerJoinOperator<Object,Object> implements Operator.ActivationListener<Context>
+{
+  private transient long timeIncrement;
+  private transient FieldObjectMap[] inputFieldObjects = (FieldObjectMap[])Array.newInstance(FieldObjectMap.class, 2);
+  protected transient Class<?> outputClass;
+  private long time = System.currentTimeMillis();
+
+  @OutputPortFieldAnnotation(schemaRequired = true)
+  public final transient DefaultOutputPort<Object> outputPort = new DefaultOutputPort<Object>()
+  {
+    @Override
+    public void setup(Context.PortContext context)
+    {
+      outputClass = context.getValue(Context.PortContext.TUPLE_CLASS);
+    }
+  };
+
+  @InputPortFieldAnnotation(schemaRequired = true)
+  public transient DefaultInputPort<Object> input1 = new DefaultInputPort<Object>()
+  {
+    @Override
+    public void setup(Context.PortContext context)
+    {
+      inputFieldObjects[0].inputClass = context.getValue(Context.PortContext.TUPLE_CLASS);
+    }
+
+    @Override
+    public void process(Object tuple)
+    {
+      processTuple(tuple,true);
+    }
+
+    @Override
+    public StreamCodec<Object> getStreamCodec()
+    {
+      return getInnerJoinStreamCodec(true);
+    }
+  };
+
+  @InputPortFieldAnnotation(schemaRequired = true)
+  public transient DefaultInputPort<Object> input2 = new DefaultInputPort<Object>()
+  {
+    @Override
+    public void setup(Context.PortContext context)
+    {
+      inputFieldObjects[1].inputClass = context.getValue(Context.PortContext.TUPLE_CLASS);
+    }
+
+    @Override
+    public void process(Object tuple)
+    {
+      processTuple(tuple,false);
+    }
+
+    @Override
+    public StreamCodec<Object> getStreamCodec()
+    {
+      return getInnerJoinStreamCodec(false);
+    }
+  };
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+    timeIncrement = context.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT) *
+      context.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS);
+    super.setup(context);
+    for (int i = 0; i < 2; i++) {
+      inputFieldObjects[i] = new FieldObjectMap();
+    }
+  }
+
+  /**
+   * Extract the time value from the given tuple
+   * @param tuple given tuple
+   * @param isStream1Data Specifies whether the given tuple belongs to stream1 or not.
+   * @return the time in milliseconds
+   */
+  @Override
+  public long extractTime(Object tuple, boolean isStream1Data)
+  {
+    return timeFields == null ? time : (long)(isStream1Data ? inputFieldObjects[0].timeFieldGet.get(tuple) :
+          inputFieldObjects[1].timeFieldGet.get(tuple));
+  }
+
+  /**
+   * Create getters for the key and time fields and setters for the include fields.
+   */
+  private void generateSettersAndGetters()
+  {
+    for (int i = 0; i < 2; i++) {
+      Class inputClass = inputFieldObjects[i].inputClass;
+      try {
+        inputFieldObjects[i].keyGet = PojoUtils.createGetter(inputClass, keyFieldExpressions.get(i), Object.class);
+        if (timeFields != null && timeFields.size() == 2) {
+          Class timeField = ClassUtils.primitiveToWrapper(inputClass.getField(timeFields.get(i)).getType());
+          inputFieldObjects[i].timeFieldGet = PojoUtils.createGetter(inputClass, timeFields.get(i), timeField);
+        }
+        for (int j = 0; j < includeFields[i].length; j++) {
+          Class inputField = ClassUtils.primitiveToWrapper(inputClass.getField(includeFields[i][j]).getType());
+          Class outputField = ClassUtils.primitiveToWrapper(outputClass.getField(includeFields[i][j]).getType());
+          if (inputField != outputField) {
+            continue;
+          }
+          inputFieldObjects[i].fieldMap.put(PojoUtils.createGetter(inputClass, includeFields[i][j], inputField),
+              PojoUtils.createSetter(outputClass, includeFields[i][j], outputField));
+        }
+      } catch (NoSuchFieldException e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  /**
+   * Extract the key value from the given tuple
+   * @param tuple given tuple
+   * @param isStream1Data Specifies whether the given tuple belongs to stream1 or not.
+   * @return the key object
+   */
+  @Override
+  public Object extractKey(Object tuple, boolean isStream1Data)
+  {
+    return isStream1Data ? inputFieldObjects[0].keyGet.get(tuple) :
+      inputFieldObjects[1].keyGet.get(tuple);
+  }
+
+  /**
+   * Merge the given tuples
+   * @param tuple1 tuple belongs to stream1
+   * @param tuple2 tuple belongs to stream1
+   * @return the merged output object
+   */
+  @Override
+  public Object mergeTuples(Object tuple1, Object tuple2)
+  {
+    Object o;
+    try {
+      o = outputClass.newInstance();
+      for (Map.Entry<PojoUtils.Getter,PojoUtils.Setter> g: inputFieldObjects[0].fieldMap.entrySet()) {
+        g.getValue().set(o, g.getKey().get(tuple1));
+      }
+      for (Map.Entry<PojoUtils.Getter,PojoUtils.Setter> g: inputFieldObjects[1].fieldMap.entrySet()) {
+        g.getValue().set(o, g.getKey().get(tuple2));
+      }
+    } catch (InstantiationException | IllegalAccessException e) {
+      throw new RuntimeException(e);
+    }
+    return o;
+  }
+
+  /**
+   * Emit the given tuple through the outputPort
+   * @param tuple given tuple
+   */
+  @Override
+  public void emitTuple(Object tuple)
+  {
+    outputPort.emit(tuple);
+  }
+
+  @Override
+  public void activate(Context context)
+  {
+    generateSettersAndGetters();
+  }
+
+  @Override
+  public void deactivate()
+  {
+  }
+
+  @Override
+  public void endWindow()
+  {
+    super.endWindow();
+    time += timeIncrement;
+  }
+
+  /**
+   * Returns the streamcodec for the streams
+   * @param isStream1data Specifies whether the codec needs for stream1 or stream2.
+   * @return the object of JoinStreamCodec
+   */
+  private StreamCodec<Object> getInnerJoinStreamCodec(boolean isStream1data)
+  {
+    if (isStream1data) {
+      return new JoinStreamCodec(getLeftKeyExpression());
+    }
+    return new JoinStreamCodec(getRightKeyExpression());
+  }
+
+  private class FieldObjectMap
+  {
+    public Class<?> inputClass;
+    public PojoUtils.Getter keyGet;
+    public PojoUtils.Getter timeFieldGet;
+    public Map<PojoUtils.Getter,PojoUtils.Setter> fieldMap;
+
+    public FieldObjectMap()
+    {
+      fieldMap = new HashMap<>();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/fe2da3e9/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
index 927a6df..25b3f8b 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
@@ -172,6 +172,8 @@ public abstract class AbstractManagedStateImpl
   protected final transient ListMultimap<Long, ValueFetchTask> tasksPerBucketId =
       Multimaps.synchronizedListMultimap(ArrayListMultimap.<Long, ValueFetchTask>create());
 
+  protected Bucket.ReadSource asyncReadSource = Bucket.ReadSource.ALL;
+
   @Override
   public void setup(OperatorContext context)
   {
@@ -569,7 +571,7 @@ public abstract class AbstractManagedStateImpl
         synchronized (bucket) {
           //a particular bucket should only be handled by one thread at any point of time. Handling of bucket here
           //involves creating readers for the time buckets and de-serializing key/value from a reader.
-          Slice value = bucket.get(key, timeBucketId, Bucket.ReadSource.ALL);
+          Slice value = bucket.get(key, timeBucketId, this.managedState.asyncReadSource);
           managedState.tasksPerBucketId.remove(bucket.getBucketId(), this);
           return value;
         }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/fe2da3e9/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateMultiValue.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateMultiValue.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateMultiValue.java
new file mode 100644
index 0000000..fd7250d
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateMultiValue.java
@@ -0,0 +1,353 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.managed;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import javax.annotation.Nullable;
+import javax.validation.constraints.NotNull;
+
+import org.apache.apex.malhar.lib.state.spillable.Spillable;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multiset;
+
+import com.datatorrent.api.StreamCodec;
+import com.datatorrent.lib.codec.KryoSerializableStreamCodec;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * Concrete implementation of SpillableByteArrayListMultimap which is needed for join operator.
+ *
+ * <b>Properties:</b><br>
+ * <b>isKeyContainsMultiValue</b>: Specifies whether the key has multiple value or not. <br>
+ * <b>timeBucket</b>: Specifies the lenght of the time bucket.
+ *
+ */
+@org.apache.hadoop.classification.InterfaceStability.Evolving
+public class ManagedTimeStateMultiValue<K,V> implements Spillable.SpillableByteArrayListMultimap<K,V>
+{
+  private transient StreamCodec streamCodec = null;
+  private boolean isKeyContainsMultiValue = false;
+  private long timeBucket;
+  @NotNull
+  private ManagedTimeStateImpl store;
+
+  public ManagedTimeStateMultiValue()
+  {
+    if (streamCodec == null) {
+      streamCodec = new KryoSerializableStreamCodec();
+    }
+  }
+
+  public ManagedTimeStateMultiValue(@NotNull ManagedTimeStateImpl store, boolean isKeyContainsMultiValue)
+  {
+    this();
+    this.store = Preconditions.checkNotNull(store);
+    this.store.asyncReadSource = Bucket.ReadSource.READERS;
+    this.isKeyContainsMultiValue = isKeyContainsMultiValue;
+  }
+
+  /**
+   * Return the list of values from the store
+   * @param k given key
+   * @return list of values
+   */
+  @Override
+  public List<V> get(@Nullable K k)
+  {
+    List<V> value = null;
+    Slice valueSlice = store.getSync(getBucketId(k), streamCodec.toByteArray(k));
+    if (valueSlice == null || valueSlice.length == 0 || valueSlice.buffer == null) {
+      return null;
+    }
+    if (isKeyContainsMultiValue) {
+      return (List<V>)streamCodec.fromByteArray(valueSlice);
+    }
+    value = new ArrayList<>();
+    value.add((V)streamCodec.fromByteArray(valueSlice));
+    return  value;
+  }
+
+  /**
+   * Returns the Future form the store.
+   * @param k given key
+   * @return
+   */
+  public CompositeFuture getAsync(@Nullable K k)
+  {
+    return new CompositeFuture(store.getAsync(getBucketId(k), streamCodec.toByteArray(k)));
+  }
+
+  @Override
+  public Set<K> keySet()
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Multiset<K> keys()
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Collection<V> values()
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Collection<Map.Entry<K, V>> entries()
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public List<V> removeAll(@Nullable Object o)
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void clear()
+  {
+
+  }
+
+  @Override
+  public int size()
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean isEmpty()
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean containsKey(@Nullable Object o)
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean containsValue(@Nullable Object o)
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean containsEntry(@Nullable Object o, @Nullable Object o1)
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  /**
+   * Inserts the (k,v) into the store.
+   * @param k key
+   * @param v value
+   * @return true if the given (k,v) is successfully inserted into the store otherwise false.
+   */
+  @Override
+  public boolean put(@Nullable K k, @Nullable V v)
+  {
+    if (isKeyContainsMultiValue) {
+      Slice keySlice = streamCodec.toByteArray(k);
+      int bucketId = getBucketId(k);
+      Slice valueSlice = store.getSync(bucketId, keySlice);
+      List<V> listOb;
+      if (valueSlice == null || valueSlice.length == 0) {
+        listOb = new ArrayList<>();
+      } else {
+        listOb = (List<V>)streamCodec.fromByteArray(valueSlice);
+      }
+      listOb.add(v);
+      return insertInStore(bucketId, timeBucket, keySlice, streamCodec.toByteArray(listOb));
+    }
+    return insertInStore(getBucketId(k), timeBucket, streamCodec.toByteArray(k),streamCodec.toByteArray(v));
+  }
+
+  /**
+   * Inserts the (k,v) into the store using the specified timebucket.
+   * @param k key
+   * @param v value
+   * @param timeBucket timebucket
+   * @return true if the given (k,v) is successfully inserted into the store otherwise false.
+   */
+  public boolean put(@Nullable K k, @Nullable V v, long timeBucket)
+  {
+    if (isKeyContainsMultiValue) {
+      Slice keySlice = streamCodec.toByteArray(k);
+      int bucketId = getBucketId(k);
+      Slice valueSlice = store.getSync(bucketId, keySlice);
+      List<V> listOb;
+      if (valueSlice == null || valueSlice.length == 0) {
+        listOb = new ArrayList<>();
+      } else {
+        listOb = (List<V>)streamCodec.fromByteArray(valueSlice);
+      }
+      listOb.add(v);
+      return insertInStore(bucketId, timeBucket, keySlice, streamCodec.toByteArray(listOb));
+    }
+    return insertInStore(getBucketId(k), timeBucket, streamCodec.toByteArray(k),streamCodec.toByteArray(v));
+  }
+
+  /**
+   * Insert (keySlice,valueSlice) into the store using bucketId and timeBucket.
+   * @param bucketId bucket Id
+   * @param timeBucket time bucket
+   * @param keySlice key slice
+   * @param valueSlice value slice
+   * @return true if the given (keySlice,valueSlice) is successfully inserted into the
+   *         store otherwise false.
+   */
+  private boolean insertInStore(long bucketId, long timeBucket, Slice keySlice, Slice valueSlice)
+  {
+    long timeBucketId = store.getTimeBucketAssigner().getTimeBucketAndAdjustBoundaries(timeBucket);
+    if (timeBucketId != -1) {
+      store.putInBucket(bucketId, timeBucketId, keySlice, valueSlice);
+      return true;
+    }
+    return false;
+  }
+
+  @Override
+  public boolean remove(@Nullable Object o, @Nullable Object o1)
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean putAll(@Nullable K k, Iterable<? extends V> iterable)
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean putAll(Multimap<? extends K, ? extends V> multimap)
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public List<V> replaceValues(K k, Iterable<? extends V> iterable)
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Map<K, Collection<V>> asMap()
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  public int getBucketId(K k)
+  {
+    return k.hashCode() % store.getNumBuckets();
+  }
+
+  public long getTimeBucket()
+  {
+    return timeBucket;
+  }
+
+  public void setTimeBucket(long timeBucket)
+  {
+    this.timeBucket = timeBucket;
+  }
+
+  public StreamCodec getStreamCodec()
+  {
+    return streamCodec;
+  }
+
+  public void setStreamCodec(StreamCodec streamCodec)
+  {
+    this.streamCodec = streamCodec;
+  }
+
+  public class CompositeFuture implements Future<List>
+  {
+    public Future<Slice> slice;
+
+    public CompositeFuture(Future<Slice> slice)
+    {
+      this.slice = slice;
+    }
+
+    @Override
+    public boolean cancel(boolean b)
+    {
+      return slice.cancel(b);
+    }
+
+    @Override
+    public boolean isCancelled()
+    {
+      return slice.isCancelled();
+    }
+
+    @Override
+    public boolean isDone()
+    {
+      return slice.isDone();
+    }
+
+    /**
+     * Converts the single element into the list.
+     * @return the list of values
+     * @throws InterruptedException
+     * @throws ExecutionException
+     */
+    @Override
+    public List get() throws InterruptedException, ExecutionException
+    {
+      List<V> value = null;
+      Slice valueSlice = slice.get();
+      if (valueSlice == null || valueSlice.length == 0 || valueSlice.buffer == null) {
+        return null;
+      }
+      if (isKeyContainsMultiValue) {
+        value = (List<V>)streamCodec.fromByteArray(valueSlice);
+      }  else {
+        value = new ArrayList<>();
+        value.add((V)streamCodec.fromByteArray(valueSlice));
+      }
+      return value;
+    }
+
+    @Override
+    public List get(long l, TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException
+    {
+      throw new UnsupportedOperationException();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/fe2da3e9/library/src/test/java/org/apache/apex/malhar/lib/join/POJOInnerJoinOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/join/POJOInnerJoinOperatorTest.java b/library/src/test/java/org/apache/apex/malhar/lib/join/POJOInnerJoinOperatorTest.java
new file mode 100644
index 0000000..0b31c32
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/join/POJOInnerJoinOperatorTest.java
@@ -0,0 +1,351 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.join;
+
+import java.io.IOException;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.lib.helper.OperatorContextTestHelper;
+import com.datatorrent.lib.testbench.CollectorTestSink;
+import com.datatorrent.lib.util.TestUtils;
+import com.datatorrent.stram.engine.PortContext;
+
+public class POJOInnerJoinOperatorTest
+{
+  @Rule
+  public final TestUtils.TestInfo testInfo = new TestUtils.TestInfo();
+  private static final String APPLICATION_PATH_PREFIX = "target/InnerJoinPOJOTest/";
+  private String applicationPath;
+  private Attribute.AttributeMap.DefaultAttributeMap attributes;
+  Context.OperatorContext context;
+
+  public static class Customer
+  {
+    public int ID;
+    public String Name;
+    public Customer()
+    {
+    }
+
+    public Customer(int ID, String name)
+    {
+      this.ID = ID;
+      Name = name;
+    }
+
+    @Override
+    public String toString()
+    {
+      return "Customer{" +
+        "ID=" + ID +
+        ", Name='" + Name + '\'' +
+        '}';
+    }
+  }
+
+  public static class Order
+  {
+    public int OID;
+    public int CID;
+    public int Amount;
+
+    public Order()
+    {
+    }
+
+    public Order(int OID, int CID, int amount)
+    {
+      this.OID = OID;
+      this.CID = CID;
+      Amount = amount;
+    }
+
+    @Override
+    public String toString()
+    {
+      return "Order{" +
+        "OID=" + OID +
+        ", CID=" + CID +
+        ", Amount=" + Amount +
+        '}';
+    }
+  }
+
+  public static class CustOrder
+  {
+    public int ID;
+    public String Name;
+    public int OID;
+    public int Amount;
+
+    public CustOrder()
+    {
+    }
+
+    @Override
+    public String toString()
+    {
+      return "{" +
+        "ID=" + ID +
+        ", Name='" + Name + '\'' +
+        ", OID=" + OID +
+        ", Amount=" + Amount +
+        '}';
+    }
+  }
+
+  @Before
+  public void beforeTest()
+  {
+    applicationPath =  OperatorContextTestHelper.getUniqueApplicationPath(APPLICATION_PATH_PREFIX);
+    attributes = new Attribute.AttributeMap.DefaultAttributeMap();
+    attributes.put(DAG.APPLICATION_PATH, applicationPath);
+    context = new OperatorContextTestHelper.TestIdOperatorContext(1, attributes);
+  }
+
+  @After
+  public void afterTest()
+  {
+    Path root = new Path(applicationPath);
+    try {
+      FileSystem fs = FileSystem.newInstance(root.toUri(), new Configuration());
+      fs.delete(root, true);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Test
+  public void testInnerJoinOperator() throws IOException, InterruptedException
+  {
+    POJOInnerJoinOperator oper = new POJOInnerJoinOperator();
+    oper.setIncludeFieldStr("ID,Name;OID,Amount");
+    oper.setLeftKeyExpression("ID");
+    oper.setRightKeyExpression("CID");
+    oper.setExpiryTime(10000L);
+
+    oper.setup(context);
+    attributes.put(DAG.InputPortMeta.TUPLE_CLASS, CustOrder.class);
+    oper.outputPort.setup(new PortContext(attributes,context));
+
+    attributes.put(DAG.InputPortMeta.TUPLE_CLASS, Customer.class);
+    oper.input1.setup(new PortContext(attributes,context));
+    attributes.put(DAG.InputPortMeta.TUPLE_CLASS, Order.class);
+    oper.input2.setup(new PortContext(attributes,context));
+    oper.activate(context);
+
+    CollectorTestSink<CustOrder> sink = new CollectorTestSink<>();
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    CollectorTestSink<Object> tmp = (CollectorTestSink)sink;
+    oper.outputPort.setSink(tmp);
+    oper.beginWindow(0);
+
+    Customer tuple = new Customer(1, "Anil");
+
+    oper.input1.process(tuple);
+    Order order = new Order(102, 1, 300);
+
+    oper.input2.process(order);
+    Order order2 = new Order(103, 3, 300);
+    oper.input2.process(order2);
+    Order order3 = new Order(104, 7, 300);
+    oper.input2.process(order3);
+
+    oper.endWindow();
+
+    /* Number of tuple, emitted */
+    Assert.assertEquals("Number of tuple emitted ", 1, sink.collectedTuples.size());
+    CustOrder emitted = sink.collectedTuples.iterator().next();
+
+    Assert.assertEquals("value of ID :", tuple.ID, emitted.ID);
+    Assert.assertEquals("value of Name :", tuple.Name, emitted.Name);
+
+    Assert.assertEquals("value of OID: ", order.OID, emitted.OID);
+    Assert.assertEquals("value of Amount: ", order.Amount, emitted.Amount);
+
+    oper.teardown();
+  }
+
+  @Test
+  public void testMultipleValues() throws IOException, InterruptedException
+  {
+    POJOInnerJoinOperator oper = new POJOInnerJoinOperator();
+    oper.setIncludeFieldStr("ID,Name;OID,Amount");
+    oper.setLeftKeyExpression("ID");
+    oper.setRightKeyExpression("CID");
+    oper.setExpiryTime(10000L);
+
+    oper.setup(context);
+    attributes.put(DAG.InputPortMeta.TUPLE_CLASS, CustOrder.class);
+    oper.outputPort.setup(new PortContext(attributes,context));
+
+    attributes.put(DAG.InputPortMeta.TUPLE_CLASS, Customer.class);
+    oper.input1.setup(new PortContext(attributes,context));
+    attributes.put(DAG.InputPortMeta.TUPLE_CLASS, Order.class);
+    oper.input2.setup(new PortContext(attributes,context));
+    oper.activate(context);
+
+    CollectorTestSink<CustOrder> sink = new CollectorTestSink<>();
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    CollectorTestSink<Object> tmp = (CollectorTestSink)sink;
+    oper.outputPort.setSink(tmp);
+
+    oper.beginWindow(0);
+    Order order = new Order(102, 1, 300);
+    oper.input2.process(order);
+
+    Order order2 = new Order(103, 3, 300);
+    oper.input2.process(order2);
+    oper.endWindow();
+    oper.beginWindow(1);
+
+    Order order3 = new Order(104, 1, 300);
+    oper.input2.process(order3);
+    Customer tuple = new Customer(1, "Anil");
+    oper.input1.process(tuple);
+    oper.endWindow();
+
+    /* Number of tuple, emitted */
+    Assert.assertEquals("Number of tuple emitted ", 2, sink.collectedTuples.size());
+    CustOrder emitted = sink.collectedTuples.get(0);
+
+    Assert.assertEquals("value of ID :", tuple.ID, emitted.ID);
+    Assert.assertEquals("value of Name :", tuple.Name, emitted.Name);
+    Assert.assertEquals("value of OID: ", order.OID, emitted.OID);
+    Assert.assertEquals("value of Amount: ", order.Amount, emitted.Amount);
+
+    emitted = sink.collectedTuples.get(1);
+    Assert.assertEquals("value of ID :", tuple.ID, emitted.ID);
+    Assert.assertEquals("value of Name :", tuple.Name, emitted.Name);
+    Assert.assertEquals("value of OID: ", order3.OID, emitted.OID);
+    Assert.assertEquals("value of Amount: ", order3.Amount, emitted.Amount);
+    oper.teardown();
+  }
+
+  @Test
+  public void testUpdateStream1Values() throws IOException, InterruptedException
+  {
+    POJOInnerJoinOperator oper = new POJOInnerJoinOperator();
+    oper.setIncludeFieldStr("ID,Name;OID,Amount");
+    oper.setLeftKeyExpression("ID");
+    oper.setRightKeyExpression("CID");
+    oper.setLeftKeyPrimary(true);
+    oper.setExpiryTime(10000L);
+
+    oper.setup(context);
+    attributes.put(DAG.InputPortMeta.TUPLE_CLASS, CustOrder.class);
+    oper.outputPort.setup(new PortContext(attributes,context));
+
+    attributes.put(DAG.InputPortMeta.TUPLE_CLASS, Customer.class);
+    oper.input1.setup(new PortContext(attributes,context));
+    attributes.put(DAG.InputPortMeta.TUPLE_CLASS, Order.class);
+    oper.input2.setup(new PortContext(attributes,context));
+    oper.activate(context);
+
+    CollectorTestSink<CustOrder> sink = new CollectorTestSink<>();
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    CollectorTestSink<Object> tmp = (CollectorTestSink)sink;
+    oper.outputPort.setSink(tmp);
+
+    oper.beginWindow(0);
+    Customer tuple1 = new Customer(1, "Anil");
+    oper.input1.process(tuple1);
+    oper.endWindow();
+    oper.beginWindow(1);
+
+    Customer tuple2 = new Customer(1, "Join");
+    oper.input1.process(tuple2);
+    Order order = new Order(102, 1, 300);
+    oper.input2.process(order);
+    Order order2 = new Order(103, 3, 300);
+    oper.input2.process(order2);
+    oper.endWindow();
+
+    /* Number of tuple, emitted */
+    Assert.assertEquals("Number of tuple emitted ", 1, sink.collectedTuples.size());
+    CustOrder emitted = sink.collectedTuples.get(0);
+
+    Assert.assertEquals("value of ID :", tuple2.ID, emitted.ID);
+    Assert.assertEquals("value of Name :", tuple2.Name, emitted.Name);
+    Assert.assertEquals("value of OID: ", order.OID, emitted.OID);
+    Assert.assertEquals("value of Amount: ", order.Amount, emitted.Amount);
+    oper.teardown();
+  }
+
+  @Test
+  public void testEmitMultipleTuplesFromStream2() throws IOException, InterruptedException
+  {
+    POJOInnerJoinOperator oper = new POJOInnerJoinOperator();
+    oper.setIncludeFieldStr("ID,Name;OID,Amount");
+    oper.setLeftKeyExpression("ID");
+    oper.setRightKeyExpression("CID");
+    oper.setLeftKeyPrimary(true);
+    oper.setExpiryTime(10000L);
+
+    oper.setup(context);
+    attributes.put(DAG.InputPortMeta.TUPLE_CLASS, CustOrder.class);
+    oper.outputPort.setup(new PortContext(attributes,context));
+
+    attributes.put(DAG.InputPortMeta.TUPLE_CLASS, Customer.class);
+    oper.input1.setup(new PortContext(attributes,context));
+    attributes.put(DAG.InputPortMeta.TUPLE_CLASS, Order.class);
+    oper.input2.setup(new PortContext(attributes,context));
+    oper.activate(context);
+
+    CollectorTestSink<CustOrder> sink = new CollectorTestSink<>();
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    CollectorTestSink<Object> tmp = (CollectorTestSink)sink;
+    oper.outputPort.setSink(tmp);
+
+    oper.beginWindow(0);
+    Customer tuple1 = new Customer(1, "Anil");
+    oper.input1.process(tuple1);
+    Order order = new Order(102, 1, 300);
+    oper.input2.process(order);
+    Order order2 = new Order(103, 1, 300);
+    oper.input2.process(order2);
+    oper.endWindow();
+
+    /* Number of tuple, emitted */
+    Assert.assertEquals("Number of tuple emitted ", 2, sink.collectedTuples.size());
+    CustOrder emitted = sink.collectedTuples.get(0);
+
+    Assert.assertEquals("value of ID :", tuple1.ID, emitted.ID);
+    Assert.assertEquals("value of Name :", tuple1.Name, emitted.Name);
+    Assert.assertEquals("value of OID: ", order.OID, emitted.OID);
+    Assert.assertEquals("value of Amount: ", order.Amount, emitted.Amount);
+    emitted = sink.collectedTuples.get(1);
+    Assert.assertEquals("value of ID :", tuple1.ID, emitted.ID);
+    Assert.assertEquals("value of Name :", tuple1.Name, emitted.Name);
+    Assert.assertEquals("value of OID: ", order2.OID, emitted.OID);
+    Assert.assertEquals("value of Amount: ", order2.Amount, emitted.Amount);
+    oper.teardown();
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/fe2da3e9/library/src/test/java/org/apache/apex/malhar/lib/join/POJOPartitionJoinOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/join/POJOPartitionJoinOperatorTest.java b/library/src/test/java/org/apache/apex/malhar/lib/join/POJOPartitionJoinOperatorTest.java
new file mode 100644
index 0000000..9f6161b
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/join/POJOPartitionJoinOperatorTest.java
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.join;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.collect.Maps;
+
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.Stats;
+import com.datatorrent.api.StatsListener;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.common.partitioner.StatelessPartitioner;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
+
+public class POJOPartitionJoinOperatorTest
+{
+  public static final int NUM_OF_PARTITIONS = 4;
+  public static final int TOTAL_TUPLES_PROCESS = 1000;
+  private static boolean testFailed = false;
+
+  public static class PartitionTestJoinOperator extends POJOInnerJoinOperator implements StatsListener
+  {
+    public int operatorId;
+    HashMap<Integer, Integer> partitionMap = Maps.newHashMap();
+    transient CountDownLatch latch = new CountDownLatch(1);
+    int tuplesProcessed = 0;
+    @AutoMetric
+    int tuplesProcessedCompletely = 0;
+
+    @Override
+    public void setup(Context.OperatorContext context)
+    {
+      super.setup(context);
+      operatorId = context.getId();
+    }
+
+    @Override
+    protected void processTuple(Object tuple, boolean isStream1Data)
+    {
+      // Verifying the data for stream1
+      if (!isStream1Data) {
+        return;
+      }
+      int key = (int)extractKey(tuple, isStream1Data);
+      if (partitionMap.containsKey(key)) {
+        if (partitionMap.get(key) != operatorId) {
+          testFailed = true;
+        }
+      } else {
+        partitionMap.put(key, operatorId);
+      }
+      tuplesProcessed++;
+    }
+
+    @Override
+    public void endWindow()
+    {
+      super.endWindow();
+      tuplesProcessedCompletely = tuplesProcessed;
+    }
+
+    @Override
+    public StatsListener.Response processStats(StatsListener.BatchedOperatorStats stats)
+    {
+      Stats.OperatorStats operatorStats = stats.getLastWindowedStats().get(stats.getLastWindowedStats().size() - 1);
+      tuplesProcessedCompletely = (Integer)operatorStats.metrics.get("tuplesProcessedCompletely");
+      if (tuplesProcessedCompletely >= TOTAL_TUPLES_PROCESS) {
+        latch.countDown();
+      }
+      return null;
+    }
+  }
+
+  public static class TestGenerator extends BaseOperator implements InputOperator
+  {
+    public final transient DefaultOutputPort<TestEvent> output = new DefaultOutputPort<>();
+    private final transient Random r = new Random();
+
+    @Override
+    public void emitTuples()
+    {
+      TestEvent event = new TestEvent();
+      event.id = r.nextInt(100);
+      output.emit(event);
+    }
+  }
+
+  public static class TestEvent
+  {
+    public int id;
+    public Date eventTime;
+
+    public TestEvent()
+    {
+    }
+
+    public int getId()
+    {
+      return id;
+    }
+
+    public void setId(int id)
+    {
+      this.id = id;
+    }
+
+    public Date getEventTime()
+    {
+      return eventTime;
+    }
+
+    public void setEventTime(Date eventTime)
+    {
+      this.eventTime = eventTime;
+    }
+  }
+
+  public static class JoinApp implements StreamingApplication
+  {
+    public PartitionTestJoinOperator joinOp;
+
+    @Override
+    public void populateDAG(DAG dag, Configuration configuration)
+    {
+      TestGenerator gen1 = dag.addOperator("Generator1", new TestGenerator());
+      TestGenerator gen2 = dag.addOperator("Generator2", new TestGenerator());
+
+      joinOp = dag.addOperator("Join", new PartitionTestJoinOperator());
+      joinOp.setLeftKeyExpression("id");
+      joinOp.setRightKeyExpression("id");
+      joinOp.setIncludeFieldStr("id,eventTime;id,eventTime");
+      joinOp.setExpiryTime(10000L);
+
+      ConsoleOutputOperator console = dag.addOperator("Console", new ConsoleOutputOperator());
+
+      dag.addStream("Gen1ToJoin", gen1.output, joinOp.input1);
+      dag.addStream("Gen2ToJoin", gen2.output, joinOp.input2);
+      dag.addStream("JoinToConsole", joinOp.outputPort, console.input);
+      dag.setInputPortAttribute(joinOp.input1, DAG.InputPortMeta.TUPLE_CLASS,TestEvent.class);
+      dag.setInputPortAttribute(joinOp.input2, DAG.InputPortMeta.TUPLE_CLASS,TestEvent.class);
+      dag.setOutputPortAttribute(joinOp.outputPort, DAG.InputPortMeta.TUPLE_CLASS,TestEvent.class);
+      dag.setAttribute(joinOp, Context.OperatorContext.PARTITIONER,
+          new StatelessPartitioner<PartitionTestJoinOperator>(NUM_OF_PARTITIONS));
+    }
+  }
+
+  /**
+   * This test validates whether a tuple key goes to exactly one partition
+   */
+  @Test
+  public void testJoinOpStreamCodec() throws Exception
+  {
+    LocalMode lma = LocalMode.newInstance();
+    Configuration conf = new Configuration(false);
+    JoinApp app = new JoinApp();
+    lma.prepareDAG(app, conf);
+    LocalMode.Controller lc = lma.getController();
+    lc.runAsync();
+    app.joinOp.latch.await();
+    Assert.assertFalse(testFailed);
+  }
+
+}


[2/2] apex-malhar git commit: Merge branch 'APEXMALHAR-2100-InnerJoin' of https://github.com/chaithu14/incubator-apex-malhar

Posted by ch...@apache.org.
Merge branch 'APEXMALHAR-2100-InnerJoin' of https://github.com/chaithu14/incubator-apex-malhar


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/822323d0
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/822323d0
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/822323d0

Branch: refs/heads/master
Commit: 822323d02725ddf24dfe725c4a57aa05b5d4b165
Parents: e44caa5 fe2da3e
Author: Chinmay Kolhatkar <ch...@datatorrent.com>
Authored: Wed Aug 24 13:30:34 2016 +0530
Committer: Chinmay Kolhatkar <ch...@datatorrent.com>
Committed: Wed Aug 24 13:30:34 2016 +0530

----------------------------------------------------------------------
 .../lib/join/AbstractInnerJoinOperator.java     | 340 ++++++++++++++++++
 .../AbstractManagedStateInnerJoinOperator.java  | 253 +++++++++++++
 .../apex/malhar/lib/join/JoinStreamCodec.java   |  46 +++
 .../malhar/lib/join/POJOInnerJoinOperator.java  | 246 +++++++++++++
 .../state/managed/AbstractManagedStateImpl.java |   4 +-
 .../managed/ManagedTimeStateMultiValue.java     | 353 +++++++++++++++++++
 .../lib/join/POJOInnerJoinOperatorTest.java     | 351 ++++++++++++++++++
 .../lib/join/POJOPartitionJoinOperatorTest.java | 194 ++++++++++
 8 files changed, 1786 insertions(+), 1 deletion(-)
----------------------------------------------------------------------