You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ch...@apache.org on 2016/08/24 08:01:44 UTC
[1/2] apex-malhar git commit: APEXMALHAR-2100 Implementation of Inner
Join operator
Repository: apex-malhar
Updated Branches:
refs/heads/master e44caa5a5 -> 822323d02
APEXMALHAR-2100 Implementation of Inner Join operator
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/fe2da3e9
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/fe2da3e9
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/fe2da3e9
Branch: refs/heads/master
Commit: fe2da3e96ade8eaef8bce38237b8147adfffa494
Parents: 255bc11
Author: chaitanya <ch...@apache.org>
Authored: Tue Aug 23 14:02:52 2016 +0530
Committer: chaitanya <ch...@apache.org>
Committed: Tue Aug 23 14:02:52 2016 +0530
----------------------------------------------------------------------
.../lib/join/AbstractInnerJoinOperator.java | 340 ++++++++++++++++++
.../AbstractManagedStateInnerJoinOperator.java | 253 +++++++++++++
.../apex/malhar/lib/join/JoinStreamCodec.java | 46 +++
.../malhar/lib/join/POJOInnerJoinOperator.java | 246 +++++++++++++
.../state/managed/AbstractManagedStateImpl.java | 4 +-
.../managed/ManagedTimeStateMultiValue.java | 353 +++++++++++++++++++
.../lib/join/POJOInnerJoinOperatorTest.java | 351 ++++++++++++++++++
.../lib/join/POJOPartitionJoinOperatorTest.java | 194 ++++++++++
8 files changed, 1786 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/fe2da3e9/library/src/main/java/org/apache/apex/malhar/lib/join/AbstractInnerJoinOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/join/AbstractInnerJoinOperator.java b/library/src/main/java/org/apache/apex/malhar/lib/join/AbstractInnerJoinOperator.java
new file mode 100644
index 0000000..dd58ea2
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/join/AbstractInnerJoinOperator.java
@@ -0,0 +1,340 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.join;
+
+import java.util.Arrays;
+import java.util.List;
+
+import javax.validation.constraints.NotNull;
+
+import org.apache.apex.malhar.lib.state.spillable.Spillable;
+import org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponent;
+import org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponentImpl;
+import org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableStateStore;
+
+import com.google.common.base.Preconditions;
+
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.annotation.OperatorAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+
+/**
+ * <p>
+ * An abstract implementation of inner join operator. Operator receives tuples from two streams,
+ * applies the join operation based on constraint and emit the joined value.
+ * Concrete classes should provide implementation to extractKey, extractTime, mergeTuples methods.
+ *
+ * <b>Properties:</b><br>
+ * <b>includeFieldStr</b>: List of comma separated fields to be added to the output tuple.
+ * Ex: Field1,Field2;Field3,Field4<br>
+ * <b>leftKeyExpression</b>: key field expression for stream1.<br>
+ * <b>rightKeyExpression</b>: key field expression for stream2.<br>
+ * <b>timeFields</b>: List of comma separated time field for both the streams. Ex: Field1,Field2<br>
+ * <b>expiryTime</b>: Expiry time in milliseconds for stored tuples which comes from both streams<br>
+ * <b>isLeftKeyPrimary</b>: : Specifies whether the left key(Stream1 key) is primary or not<br>
+ * <b>isRightKeyPrimary</b>: : Specifies whether the right key(stream2 key) is primary or not<br>
+ *
+ * <b> Example: </b> <br>
+ * Left input port receives customer details and right input port receives Order details.
+ * Schema for the Customer be in the form of {ID, Name, CTime}
+ * Schema for the Order be in the form of {OID, CID, OTime}
+ * Now, Join the tuples of Customer and Order streams where Customer.ID = Order.CID and the constraint is
+ * matched tuples must have timestamp within 5 minutes.
+ * Here, leftKeyExpression = ID, rightKeyExpression = CID and Time Fields = CTime,
+ * OTime, expiryTime = 5 minutes </b> <br>
+ *
+ * @displayName Abstract Inner Join Operator
+ * @tags join
+ */
+@org.apache.hadoop.classification.InterfaceStability.Evolving
+@OperatorAnnotation(checkpointableWithinAppWindow = false)
+public abstract class AbstractInnerJoinOperator<K,T> extends BaseOperator
+{
+ @NotNull
+ private String leftKeyExpression;
+ @NotNull
+ private String rightKeyExpression;
+ protected transient String[][] includeFields;
+ protected transient List<String> keyFieldExpressions;
+ protected transient List<String> timeFields;
+ @AutoMetric
+ private long tuplesJoinedPerSec;
+ private double windowTimeSec;
+ private int tuplesCount;
+ @NotNull
+ private String includeFieldStr;
+ private String timeFieldsStr;
+ @NotNull
+ private Long expiryTime;
+ private boolean isLeftKeyPrimary = false;
+ private boolean isRightKeyPrimary = false;
+ protected SpillableComplexComponent component;
+ protected Spillable.SpillableByteArrayListMultimap<K,T> stream1Data;
+ protected Spillable.SpillableByteArrayListMultimap<K,T> stream2Data;
+
+ /**
+ * Process the tuple which are received from input ports with the following steps:
+ * 1) Extract key from the given tuple
+ * 2) Insert <key,tuple> into the store where store is the stream1Data if the tuple
+ * receives from stream1 or viceversa.
+ * 3) Get the values of the key if found it in opposite store
+ * 4) Merge the given tuple and values found from step (3)
+ * @param tuple given tuple
+ * @param isStream1Data Specifies whether the given tuple belongs to stream1 or not.
+ */
+ protected void processTuple(T tuple, boolean isStream1Data)
+ {
+ Spillable.SpillableByteArrayListMultimap<K,T> store = isStream1Data ? stream1Data : stream2Data;
+ K key = extractKey(tuple,isStream1Data);
+ if (!store.put(key, tuple)) {
+ return;
+ }
+ Spillable.SpillableByteArrayListMultimap<K, T> valuestore = isStream1Data ? stream2Data : stream1Data;
+ joinStream(tuple,isStream1Data, valuestore.get(key));
+ }
+
+ /**
+ * Merge the given tuple and list of values.
+ * @param tuple given tuple
+ * @param isStream1Data Specifies whether the given tuple belongs to stream1 or not.
+ * @param value list of tuples
+ */
+ protected void joinStream(T tuple, boolean isStream1Data, List<T> value)
+ {
+ // Join the input tuple with the joined tuples
+ if (value != null) {
+ for (T joinedValue : value) {
+ T result = isStream1Data ? mergeTuples(tuple, joinedValue) :
+ mergeTuples(joinedValue, tuple);
+ if (result != null) {
+ tuplesCount++;
+ emitTuple(result);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ component = new SpillableComplexComponentImpl(new InMemSpillableStateStore());
+ if (stream1Data == null && stream2Data == null) {
+ createStores();
+ }
+ component.setup(context);
+ keyFieldExpressions = Arrays.asList(leftKeyExpression,rightKeyExpression);
+ if (timeFields != null) {
+ timeFields = Arrays.asList(timeFieldsStr.split(","));
+ }
+ String[] streamFields = includeFieldStr.split(";");
+ includeFields = new String[2][];
+ for (int i = 0; i < streamFields.length; i++) {
+ includeFields[i] = streamFields[i].split(",");
+ }
+ windowTimeSec = (context.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT) *
+ context.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS) * 1.0) / 1000.0;
+ }
+
+ @Override
+ public void beginWindow(long windowId)
+ {
+ component.beginWindow(windowId);
+ tuplesJoinedPerSec = 0;
+ tuplesCount = 0;
+ }
+
+ @Override
+ public void endWindow()
+ {
+ component.endWindow();
+ tuplesJoinedPerSec = (long)(tuplesCount / windowTimeSec);
+ }
+
+ @Override
+ public void teardown()
+ {
+ component.teardown();
+ }
+
+ /**
+ * Extract the key from the given tuple
+ * @param tuple given tuple
+ * @param isStream1Data Specifies whether the given tuple belongs to stream1 or not.
+ * @return the key
+ */
+ public abstract K extractKey(T tuple, boolean isStream1Data);
+
+ /**
+ * Extract the time from the given tuple
+ * @param tuple given tuple
+ * @param isStream1Data Specifies whether the given tuple belongs to stream1 or not.
+ * @return the time
+ */
+ public abstract long extractTime(T tuple, boolean isStream1Data);
+
+ /**
+ * Merge the given tuples
+ * @param tuple1 tuple belongs to stream1
+ * @param tuple2 tuple belongs to stream1
+ * @return the merge tuple
+ */
+ public abstract T mergeTuples(T tuple1, T tuple2);
+
+ /**
+ * Emit the given tuple
+ * @param tuple given tuple
+ */
+ public abstract void emitTuple(T tuple);
+
+ /**
+ * Create stores for both the streams
+ */
+ public void createStores()
+ {
+ stream1Data = component.newSpillableByteArrayListMultimap(0,null,null);
+ stream2Data = component.newSpillableByteArrayListMultimap(0,null,null);
+ }
+
+ /**
+ * Get the left key expression
+ * @return the leftKeyExpression
+ */
+ public String getLeftKeyExpression()
+ {
+ return leftKeyExpression;
+ }
+
+ /**
+ * Set the left key expression
+ * @param leftKeyExpression given leftKeyExpression
+ */
+ public void setLeftKeyExpression(String leftKeyExpression)
+ {
+ this.leftKeyExpression = leftKeyExpression;
+ }
+
+ /**
+ * Get the right key expression
+ * @return the rightKeyExpression
+ */
+ public String getRightKeyExpression()
+ {
+ return rightKeyExpression;
+ }
+
+ /**
+ * Set the right key expression
+ * @param rightKeyExpression given rightKeyExpression
+ */
+ public void setRightKeyExpression(String rightKeyExpression)
+ {
+ this.rightKeyExpression = rightKeyExpression;
+ }
+
+ /**
+ * Return the include fields of two streams
+ * @return the includeFieldStr
+ */
+ public String getIncludeFieldStr()
+ {
+ return includeFieldStr;
+ }
+
+ /**
+ * List of comma separated fields to be added to the output tuple.
+ * @param includeFieldStr given includeFieldStr
+ */
+ public void setIncludeFieldStr(@NotNull String includeFieldStr)
+ {
+ this.includeFieldStr = Preconditions.checkNotNull(includeFieldStr);
+ }
+
+ /**
+ * Return the time fields for both the streams
+ * @return the timeFieldsStr
+ */
+ public String getTimeFieldsStr()
+ {
+ return timeFieldsStr;
+ }
+
+ /**
+ * Set the time fields as comma separated for both the streams
+ * @param timeFieldsStr given timeFieldsStr
+ */
+ public void setTimeFieldsStr(String timeFieldsStr)
+ {
+ this.timeFieldsStr = timeFieldsStr;
+ }
+
+ /**
+ * returns the expiry time
+ * @return the expiryTime
+ */
+ public Long getExpiryTime()
+ {
+ return expiryTime;
+ }
+
+ /**
+ * Sets the expiry time
+ * @return the expiryTime
+ */
+ public void setExpiryTime(@NotNull Long expiryTime)
+ {
+ this.expiryTime = Preconditions.checkNotNull(expiryTime);
+ }
+
+ /**
+ * return whether the left key is primary or not
+ * @return the isLeftKeyPrimary
+ */
+ public boolean isLeftKeyPrimary()
+ {
+ return isLeftKeyPrimary;
+ }
+
+ /**
+ * Set the leftKeyPrimary
+ * @param leftKeyPrimary given leftKeyPrimary
+ */
+ public void setLeftKeyPrimary(boolean leftKeyPrimary)
+ {
+ isLeftKeyPrimary = leftKeyPrimary;
+ }
+
+ /**
+ * return whether the right key is primary or not
+ * @return the isRightKeyPrimary
+ */
+ public boolean isRightKeyPrimary()
+ {
+ return isRightKeyPrimary;
+ }
+
+ /**
+ * Set the rightKeyPrimary
+ * @param rightKeyPrimary given rightKeyPrimary
+ */
+ public void setRightKeyPrimary(boolean rightKeyPrimary)
+ {
+ isRightKeyPrimary = rightKeyPrimary;
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/fe2da3e9/library/src/main/java/org/apache/apex/malhar/lib/join/AbstractManagedStateInnerJoinOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/join/AbstractManagedStateInnerJoinOperator.java b/library/src/main/java/org/apache/apex/malhar/lib/join/AbstractManagedStateInnerJoinOperator.java
new file mode 100644
index 0000000..dbf903d
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/join/AbstractManagedStateInnerJoinOperator.java
@@ -0,0 +1,253 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.join;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.joda.time.Duration;
+import org.apache.apex.malhar.lib.state.managed.ManagedTimeStateImpl;
+import org.apache.apex.malhar.lib.state.managed.ManagedTimeStateMultiValue;
+import org.apache.apex.malhar.lib.state.spillable.Spillable;
+import org.apache.hadoop.fs.Path;
+import com.google.common.collect.Maps;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.Operator;
+import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
+
+/**
+ * An abstract implementation of inner join operator over Managed state which extends from
+ * AbstractInnerJoinOperator.
+ *
+ * <b>Properties:</b><br>
+ * <b>noOfBuckets</b>: Number of buckets required for Managed state. <br>
+ * <b>bucketSpanTime</b>: Indicates the length of the time bucket. <br>
+ */
+@org.apache.hadoop.classification.InterfaceStability.Evolving
+public abstract class AbstractManagedStateInnerJoinOperator<K,T> extends AbstractInnerJoinOperator<K,T> implements
+ Operator.CheckpointNotificationListener, Operator.IdleTimeHandler
+{
+ public static final String stateDir = "managedState";
+ public static final String stream1State = "stream1Data";
+ public static final String stream2State = "stream2Data";
+ private transient Map<JoinEvent<K,T>, Future<List>> waitingEvents = Maps.newLinkedHashMap();
+ private int noOfBuckets = 1;
+ private Long bucketSpanTime;
+ protected ManagedTimeStateImpl stream1Store;
+ protected ManagedTimeStateImpl stream2Store;
+
+ /**
+ * Create Managed states and stores for both the streams.
+ */
+ @Override
+ public void createStores()
+ {
+ stream1Store = new ManagedTimeStateImpl();
+ stream2Store = new ManagedTimeStateImpl();
+ stream1Store.setNumBuckets(noOfBuckets);
+ stream2Store.setNumBuckets(noOfBuckets);
+ if (bucketSpanTime != null) {
+ stream1Store.getTimeBucketAssigner().setBucketSpan(Duration.millis(bucketSpanTime));
+ stream2Store.getTimeBucketAssigner().setBucketSpan(Duration.millis(bucketSpanTime));
+ }
+ stream1Store.getTimeBucketAssigner().setExpireBefore(Duration.millis(getExpiryTime()));
+ stream2Store.getTimeBucketAssigner().setExpireBefore(Duration.millis(getExpiryTime()));
+
+ stream1Data = new ManagedTimeStateMultiValue(stream1Store, !isLeftKeyPrimary());
+ stream2Data = new ManagedTimeStateMultiValue(stream2Store, !isRightKeyPrimary());
+ }
+
+ /**
+ * Process the tuple which are received from input ports with the following steps:
+ * 1) Extract key from the given tuple
+ * 2) Insert <key,tuple> into the store where store is the stream1Data if the tuple
+ * receives from stream1 or viceversa.
+ * 3) Get the values of the key in asynchronous if found it in opposite store
+ * 4) If the future is done then Merge the given tuple and values found from step (3) otherwise
+ * put it in waitingEvents
+ * @param tuple given tuple
+ * @param isStream1Data Specifies whether the given tuple belongs to stream1 or not.
+ */
+ @Override
+ protected void processTuple(T tuple, boolean isStream1Data)
+ {
+ Spillable.SpillableByteArrayListMultimap<K,T> store = isStream1Data ? stream1Data : stream2Data;
+ K key = extractKey(tuple,isStream1Data);
+ long timeBucket = extractTime(tuple,isStream1Data);
+ if (!((ManagedTimeStateMultiValue)store).put(key, tuple,timeBucket)) {
+ return;
+ }
+ Spillable.SpillableByteArrayListMultimap<K, T> valuestore = isStream1Data ? stream2Data : stream1Data;
+ Future<List> future = ((ManagedTimeStateMultiValue)valuestore).getAsync(key);
+ if (future.isDone()) {
+ try {
+ joinStream(tuple,isStream1Data, future.get());
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ } else {
+ waitingEvents.put(new JoinEvent<>(key,tuple,isStream1Data),future);
+ }
+ }
+
+ @Override
+ public void handleIdleTime()
+ {
+ if (waitingEvents.size() > 0) {
+ processWaitEvents(false);
+ }
+ }
+
+ @Override
+ public void beforeCheckpoint(long l)
+ {
+ stream1Store.beforeCheckpoint(l);
+ stream2Store.beforeCheckpoint(l);
+ }
+
+ @Override
+ public void checkpointed(long l)
+ {
+ stream1Store.checkpointed(l);
+ stream2Store.checkpointed(l);
+ }
+
+ @Override
+ public void committed(long l)
+ {
+ stream1Store.committed(l);
+ stream2Store.committed(l);
+ }
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ super.setup(context);
+ ((FileAccessFSImpl)stream1Store.getFileAccess()).setBasePath(context.getValue(DAG.APPLICATION_PATH) + Path.SEPARATOR + stateDir + Path.SEPARATOR + String.valueOf(context.getId()) + Path.SEPARATOR + stream1State);
+ ((FileAccessFSImpl)stream2Store.getFileAccess()).setBasePath(context.getValue(DAG.APPLICATION_PATH) + Path.SEPARATOR + stateDir + Path.SEPARATOR + String.valueOf(context.getId()) + Path.SEPARATOR + stream2State);
+ stream1Store.getCheckpointManager().setRecoveryPath("managed_state_" + stream1State);
+ stream1Store.getCheckpointManager().setRecoveryPath("managed_state_" + stream2State);
+ stream1Store.setup(context);
+ stream2Store.setup(context);
+ }
+
+ @Override
+ public void beginWindow(long windowId)
+ {
+ stream1Store.beginWindow(windowId);
+ stream2Store.beginWindow(windowId);
+ super.beginWindow(windowId);
+ }
+
+ /**
+ * Process the waiting events
+ * @param finalize finalize Whether or not to wait for future to return
+ */
+ private void processWaitEvents(boolean finalize)
+ {
+ Iterator<Map.Entry<JoinEvent<K,T>, Future<List>>> waitIterator = waitingEvents.entrySet().iterator();
+ while (waitIterator.hasNext()) {
+ Map.Entry<JoinEvent<K,T>, Future<List>> waitingEvent = waitIterator.next();
+ Future<List> future = waitingEvent.getValue();
+ if (future.isDone() || finalize) {
+ try {
+ JoinEvent<K,T> event = waitingEvent.getKey();
+ joinStream(event.value,event.isStream1Data,future.get());
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException("end window", e);
+ }
+ waitIterator.remove();
+ if (!finalize) {
+ break;
+ }
+ }
+ }
+ }
+
+ @Override
+ public void endWindow()
+ {
+ processWaitEvents(true);
+ stream1Store.endWindow();
+ stream2Store.endWindow();
+ super.endWindow();
+ }
+
+ @Override
+ public void teardown()
+ {
+ stream1Store.teardown();
+ stream2Store.teardown();
+ super.teardown();
+ }
+
+ /**
+ * Return the number of buckets
+ * @return the noOfBuckets
+ */
+ public int getNoOfBuckets()
+ {
+ return noOfBuckets;
+ }
+
+ /**
+ * Set the number of buckets required for managed state
+ * @param noOfBuckets noOfBuckets
+ */
+ public void setNoOfBuckets(int noOfBuckets)
+ {
+ this.noOfBuckets = noOfBuckets;
+ }
+
+ /**
+ * Return the bucketSpanTime
+ * @return the bucketSpanTime
+ */
+ public Long getBucketSpanTime()
+ {
+ return bucketSpanTime;
+ }
+
+ /**
+ * Sets the length of the time bucket required for managed state.
+ * @param bucketSpanTime given bucketSpanTime
+ */
+ public void setBucketSpanTime(Long bucketSpanTime)
+ {
+ this.bucketSpanTime = bucketSpanTime;
+ }
+
+ public static class JoinEvent<K,T>
+ {
+ public K key;
+ public T value;
+ public boolean isStream1Data;
+
+ public JoinEvent(K key, T value, boolean isStream1Data)
+ {
+ this.key = key;
+ this.value = value;
+ this.isStream1Data = isStream1Data;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/fe2da3e9/library/src/main/java/org/apache/apex/malhar/lib/join/JoinStreamCodec.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/join/JoinStreamCodec.java b/library/src/main/java/org/apache/apex/malhar/lib/join/JoinStreamCodec.java
new file mode 100644
index 0000000..7a34699
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/join/JoinStreamCodec.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.join;
+
+import com.datatorrent.lib.codec.KryoSerializableStreamCodec;
+import com.datatorrent.lib.util.PojoUtils;
+
+/**
+ * Stream codec based on keyExpression for POJO Inner Join Operator.
+ */
+@org.apache.hadoop.classification.InterfaceStability.Evolving
+public class JoinStreamCodec extends KryoSerializableStreamCodec<Object>
+{
+ private transient PojoUtils.Getter<Object, Object> getter;
+ private String keyExpression;
+
+ public JoinStreamCodec(String keyExpression)
+ {
+ this.keyExpression = keyExpression;
+ }
+
+ @Override
+ public int getPartition(Object o)
+ {
+ if (getter == null) {
+ getter = PojoUtils.createGetter(o.getClass(), keyExpression, Object.class);
+ }
+ return getter.get(o).hashCode();
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/fe2da3e9/library/src/main/java/org/apache/apex/malhar/lib/join/POJOInnerJoinOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/join/POJOInnerJoinOperator.java b/library/src/main/java/org/apache/apex/malhar/lib/join/POJOInnerJoinOperator.java
new file mode 100644
index 0000000..0b23808
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/join/POJOInnerJoinOperator.java
@@ -0,0 +1,246 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.join;
+
+import java.lang.reflect.Array;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.lang3.ClassUtils;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.StreamCodec;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.lib.util.PojoUtils;
+
+/**
+ * Concrete implementation of AbstractManagedStateInnerJoinOperator and receives objects from both streams.
+ *
+ * @displayName POJO Inner Join Operator
+ * @tags join
+ */
+@org.apache.hadoop.classification.InterfaceStability.Evolving
+public class POJOInnerJoinOperator extends AbstractManagedStateInnerJoinOperator<Object,Object> implements Operator.ActivationListener<Context>
+{
+ private transient long timeIncrement;
+ private transient FieldObjectMap[] inputFieldObjects = (FieldObjectMap[])Array.newInstance(FieldObjectMap.class, 2);
+ protected transient Class<?> outputClass;
+ private long time = System.currentTimeMillis();
+
+ @OutputPortFieldAnnotation(schemaRequired = true)
+ public final transient DefaultOutputPort<Object> outputPort = new DefaultOutputPort<Object>()
+ {
+ @Override
+ public void setup(Context.PortContext context)
+ {
+ outputClass = context.getValue(Context.PortContext.TUPLE_CLASS);
+ }
+ };
+
+ @InputPortFieldAnnotation(schemaRequired = true)
+ public transient DefaultInputPort<Object> input1 = new DefaultInputPort<Object>()
+ {
+ @Override
+ public void setup(Context.PortContext context)
+ {
+ inputFieldObjects[0].inputClass = context.getValue(Context.PortContext.TUPLE_CLASS);
+ }
+
+ @Override
+ public void process(Object tuple)
+ {
+ processTuple(tuple,true);
+ }
+
+ @Override
+ public StreamCodec<Object> getStreamCodec()
+ {
+ return getInnerJoinStreamCodec(true);
+ }
+ };
+
+ @InputPortFieldAnnotation(schemaRequired = true)
+ public transient DefaultInputPort<Object> input2 = new DefaultInputPort<Object>()
+ {
+ @Override
+ public void setup(Context.PortContext context)
+ {
+ inputFieldObjects[1].inputClass = context.getValue(Context.PortContext.TUPLE_CLASS);
+ }
+
+ @Override
+ public void process(Object tuple)
+ {
+ processTuple(tuple,false);
+ }
+
+ @Override
+ public StreamCodec<Object> getStreamCodec()
+ {
+ return getInnerJoinStreamCodec(false);
+ }
+ };
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ timeIncrement = context.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT) *
+ context.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS);
+ super.setup(context);
+ for (int i = 0; i < 2; i++) {
+ inputFieldObjects[i] = new FieldObjectMap();
+ }
+ }
+
+ /**
+ * Extract the time value from the given tuple
+ * @param tuple given tuple
+ * @param isStream1Data Specifies whether the given tuple belongs to stream1 or not.
+ * @return the time in milliseconds
+ */
+ @Override
+ public long extractTime(Object tuple, boolean isStream1Data)
+ {
+ return timeFields == null ? time : (long)(isStream1Data ? inputFieldObjects[0].timeFieldGet.get(tuple) :
+ inputFieldObjects[1].timeFieldGet.get(tuple));
+ }
+
+ /**
+ * Create getters for the key and time fields and setters for the include fields.
+ */
+ private void generateSettersAndGetters()
+ {
+ for (int i = 0; i < 2; i++) {
+ Class inputClass = inputFieldObjects[i].inputClass;
+ try {
+ inputFieldObjects[i].keyGet = PojoUtils.createGetter(inputClass, keyFieldExpressions.get(i), Object.class);
+ if (timeFields != null && timeFields.size() == 2) {
+ Class timeField = ClassUtils.primitiveToWrapper(inputClass.getField(timeFields.get(i)).getType());
+ inputFieldObjects[i].timeFieldGet = PojoUtils.createGetter(inputClass, timeFields.get(i), timeField);
+ }
+ for (int j = 0; j < includeFields[i].length; j++) {
+ Class inputField = ClassUtils.primitiveToWrapper(inputClass.getField(includeFields[i][j]).getType());
+ Class outputField = ClassUtils.primitiveToWrapper(outputClass.getField(includeFields[i][j]).getType());
+ if (inputField != outputField) {
+ continue;
+ }
+ inputFieldObjects[i].fieldMap.put(PojoUtils.createGetter(inputClass, includeFields[i][j], inputField),
+ PojoUtils.createSetter(outputClass, includeFields[i][j], outputField));
+ }
+ } catch (NoSuchFieldException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ /**
+ * Extract the key value from the given tuple
+ * @param tuple given tuple
+ * @param isStream1Data Specifies whether the given tuple belongs to stream1 or not.
+ * @return the key object
+ */
+ @Override
+ public Object extractKey(Object tuple, boolean isStream1Data)
+ {
+ return isStream1Data ? inputFieldObjects[0].keyGet.get(tuple) :
+ inputFieldObjects[1].keyGet.get(tuple);
+ }
+
+ /**
+ * Merge the given tuples
+ * @param tuple1 tuple belongs to stream1
+ * @param tuple2 tuple belongs to stream1
+ * @return the merged output object
+ */
+ @Override
+ public Object mergeTuples(Object tuple1, Object tuple2)
+ {
+ Object o;
+ try {
+ o = outputClass.newInstance();
+ for (Map.Entry<PojoUtils.Getter,PojoUtils.Setter> g: inputFieldObjects[0].fieldMap.entrySet()) {
+ g.getValue().set(o, g.getKey().get(tuple1));
+ }
+ for (Map.Entry<PojoUtils.Getter,PojoUtils.Setter> g: inputFieldObjects[1].fieldMap.entrySet()) {
+ g.getValue().set(o, g.getKey().get(tuple2));
+ }
+ } catch (InstantiationException | IllegalAccessException e) {
+ throw new RuntimeException(e);
+ }
+ return o;
+ }
+
+ /**
+ * Emit the given tuple through the outputPort
+ * @param tuple given tuple
+ */
+ @Override
+ public void emitTuple(Object tuple)
+ {
+ outputPort.emit(tuple);
+ }
+
+ @Override
+ public void activate(Context context)
+ {
+ generateSettersAndGetters();
+ }
+
+ @Override
+ public void deactivate()
+ {
+ }
+
+ @Override
+ public void endWindow()
+ {
+ super.endWindow();
+ time += timeIncrement;
+ }
+
+ /**
+ * Returns the streamcodec for the streams
+ * @param isStream1data Specifies whether the codec needs for stream1 or stream2.
+ * @return the object of JoinStreamCodec
+ */
+ private StreamCodec<Object> getInnerJoinStreamCodec(boolean isStream1data)
+ {
+ if (isStream1data) {
+ return new JoinStreamCodec(getLeftKeyExpression());
+ }
+ return new JoinStreamCodec(getRightKeyExpression());
+ }
+
+ private class FieldObjectMap
+ {
+ public Class<?> inputClass;
+ public PojoUtils.Getter keyGet;
+ public PojoUtils.Getter timeFieldGet;
+ public Map<PojoUtils.Getter,PojoUtils.Setter> fieldMap;
+
+ public FieldObjectMap()
+ {
+ fieldMap = new HashMap<>();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/fe2da3e9/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
index 927a6df..25b3f8b 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
@@ -172,6 +172,8 @@ public abstract class AbstractManagedStateImpl
protected final transient ListMultimap<Long, ValueFetchTask> tasksPerBucketId =
Multimaps.synchronizedListMultimap(ArrayListMultimap.<Long, ValueFetchTask>create());
+ protected Bucket.ReadSource asyncReadSource = Bucket.ReadSource.ALL;
+
@Override
public void setup(OperatorContext context)
{
@@ -569,7 +571,7 @@ public abstract class AbstractManagedStateImpl
synchronized (bucket) {
//a particular bucket should only be handled by one thread at any point of time. Handling of bucket here
//involves creating readers for the time buckets and de-serializing key/value from a reader.
- Slice value = bucket.get(key, timeBucketId, Bucket.ReadSource.ALL);
+ Slice value = bucket.get(key, timeBucketId, this.managedState.asyncReadSource);
managedState.tasksPerBucketId.remove(bucket.getBucketId(), this);
return value;
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/fe2da3e9/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateMultiValue.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateMultiValue.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateMultiValue.java
new file mode 100644
index 0000000..fd7250d
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateMultiValue.java
@@ -0,0 +1,353 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.managed;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import javax.annotation.Nullable;
+import javax.validation.constraints.NotNull;
+
+import org.apache.apex.malhar.lib.state.spillable.Spillable;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multiset;
+
+import com.datatorrent.api.StreamCodec;
+import com.datatorrent.lib.codec.KryoSerializableStreamCodec;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * Concrete implementation of SpillableByteArrayListMultimap which is needed for join operator.
+ *
+ * <b>Properties:</b><br>
+ * <b>isKeyContainsMultiValue</b>: Specifies whether the key has multiple value or not. <br>
+ * <b>timeBucket</b>: Specifies the lenght of the time bucket.
+ *
+ */
+@org.apache.hadoop.classification.InterfaceStability.Evolving
+public class ManagedTimeStateMultiValue<K,V> implements Spillable.SpillableByteArrayListMultimap<K,V>
+{
+ private transient StreamCodec streamCodec = null;
+ private boolean isKeyContainsMultiValue = false;
+ private long timeBucket;
+ @NotNull
+ private ManagedTimeStateImpl store;
+
+ public ManagedTimeStateMultiValue()
+ {
+ if (streamCodec == null) {
+ streamCodec = new KryoSerializableStreamCodec();
+ }
+ }
+
+ public ManagedTimeStateMultiValue(@NotNull ManagedTimeStateImpl store, boolean isKeyContainsMultiValue)
+ {
+ this();
+ this.store = Preconditions.checkNotNull(store);
+ this.store.asyncReadSource = Bucket.ReadSource.READERS;
+ this.isKeyContainsMultiValue = isKeyContainsMultiValue;
+ }
+
+ /**
+ * Return the list of values from the store
+ * @param k given key
+ * @return list of values
+ */
+ @Override
+ public List<V> get(@Nullable K k)
+ {
+ List<V> value = null;
+ Slice valueSlice = store.getSync(getBucketId(k), streamCodec.toByteArray(k));
+ if (valueSlice == null || valueSlice.length == 0 || valueSlice.buffer == null) {
+ return null;
+ }
+ if (isKeyContainsMultiValue) {
+ return (List<V>)streamCodec.fromByteArray(valueSlice);
+ }
+ value = new ArrayList<>();
+ value.add((V)streamCodec.fromByteArray(valueSlice));
+ return value;
+ }
+
+ /**
+ * Returns the Future form the store.
+ * @param k given key
+ * @return
+ */
+ public CompositeFuture getAsync(@Nullable K k)
+ {
+ return new CompositeFuture(store.getAsync(getBucketId(k), streamCodec.toByteArray(k)));
+ }
+
+ @Override
+ public Set<K> keySet()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Multiset<K> keys()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Collection<V> values()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Collection<Map.Entry<K, V>> entries()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public List<V> removeAll(@Nullable Object o)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void clear()
+ {
+
+ }
+
+ @Override
+ public int size()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean isEmpty()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean containsKey(@Nullable Object o)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean containsValue(@Nullable Object o)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean containsEntry(@Nullable Object o, @Nullable Object o1)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Inserts the (k,v) into the store.
+ * @param k key
+ * @param v value
+ * @return true if the given (k,v) is successfully inserted into the store otherwise false.
+ */
+ @Override
+ public boolean put(@Nullable K k, @Nullable V v)
+ {
+ if (isKeyContainsMultiValue) {
+ Slice keySlice = streamCodec.toByteArray(k);
+ int bucketId = getBucketId(k);
+ Slice valueSlice = store.getSync(bucketId, keySlice);
+ List<V> listOb;
+ if (valueSlice == null || valueSlice.length == 0) {
+ listOb = new ArrayList<>();
+ } else {
+ listOb = (List<V>)streamCodec.fromByteArray(valueSlice);
+ }
+ listOb.add(v);
+ return insertInStore(bucketId, timeBucket, keySlice, streamCodec.toByteArray(listOb));
+ }
+ return insertInStore(getBucketId(k), timeBucket, streamCodec.toByteArray(k),streamCodec.toByteArray(v));
+ }
+
+ /**
+ * Inserts the (k,v) into the store using the specified timebucket.
+ * @param k key
+ * @param v value
+ * @param timeBucket timebucket
+ * @return true if the given (k,v) is successfully inserted into the store otherwise false.
+ */
+ public boolean put(@Nullable K k, @Nullable V v, long timeBucket)
+ {
+ if (isKeyContainsMultiValue) {
+ Slice keySlice = streamCodec.toByteArray(k);
+ int bucketId = getBucketId(k);
+ Slice valueSlice = store.getSync(bucketId, keySlice);
+ List<V> listOb;
+ if (valueSlice == null || valueSlice.length == 0) {
+ listOb = new ArrayList<>();
+ } else {
+ listOb = (List<V>)streamCodec.fromByteArray(valueSlice);
+ }
+ listOb.add(v);
+ return insertInStore(bucketId, timeBucket, keySlice, streamCodec.toByteArray(listOb));
+ }
+ return insertInStore(getBucketId(k), timeBucket, streamCodec.toByteArray(k),streamCodec.toByteArray(v));
+ }
+
+ /**
+ * Insert (keySlice,valueSlice) into the store using bucketId and timeBucket.
+ * @param bucketId bucket Id
+ * @param timeBucket time bucket
+ * @param keySlice key slice
+ * @param valueSlice value slice
+ * @return true if the given (keySlice,valueSlice) is successfully inserted into the
+ * store otherwise false.
+ */
+ private boolean insertInStore(long bucketId, long timeBucket, Slice keySlice, Slice valueSlice)
+ {
+ long timeBucketId = store.getTimeBucketAssigner().getTimeBucketAndAdjustBoundaries(timeBucket);
+ if (timeBucketId != -1) {
+ store.putInBucket(bucketId, timeBucketId, keySlice, valueSlice);
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public boolean remove(@Nullable Object o, @Nullable Object o1)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean putAll(@Nullable K k, Iterable<? extends V> iterable)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean putAll(Multimap<? extends K, ? extends V> multimap)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public List<V> replaceValues(K k, Iterable<? extends V> iterable)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Map<K, Collection<V>> asMap()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public int getBucketId(K k)
+ {
+ return k.hashCode() % store.getNumBuckets();
+ }
+
+ public long getTimeBucket()
+ {
+ return timeBucket;
+ }
+
+ public void setTimeBucket(long timeBucket)
+ {
+ this.timeBucket = timeBucket;
+ }
+
+ public StreamCodec getStreamCodec()
+ {
+ return streamCodec;
+ }
+
+ public void setStreamCodec(StreamCodec streamCodec)
+ {
+ this.streamCodec = streamCodec;
+ }
+
+ public class CompositeFuture implements Future<List>
+ {
+ public Future<Slice> slice;
+
+ public CompositeFuture(Future<Slice> slice)
+ {
+ this.slice = slice;
+ }
+
+ @Override
+ public boolean cancel(boolean b)
+ {
+ return slice.cancel(b);
+ }
+
+ @Override
+ public boolean isCancelled()
+ {
+ return slice.isCancelled();
+ }
+
+ @Override
+ public boolean isDone()
+ {
+ return slice.isDone();
+ }
+
+ /**
+ * Converts the single element into the list.
+ * @return the list of values
+ * @throws InterruptedException
+ * @throws ExecutionException
+ */
+ @Override
+ public List get() throws InterruptedException, ExecutionException
+ {
+ List<V> value = null;
+ Slice valueSlice = slice.get();
+ if (valueSlice == null || valueSlice.length == 0 || valueSlice.buffer == null) {
+ return null;
+ }
+ if (isKeyContainsMultiValue) {
+ value = (List<V>)streamCodec.fromByteArray(valueSlice);
+ } else {
+ value = new ArrayList<>();
+ value.add((V)streamCodec.fromByteArray(valueSlice));
+ }
+ return value;
+ }
+
+ @Override
+ public List get(long l, TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException
+ {
+ throw new UnsupportedOperationException();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/fe2da3e9/library/src/test/java/org/apache/apex/malhar/lib/join/POJOInnerJoinOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/join/POJOInnerJoinOperatorTest.java b/library/src/test/java/org/apache/apex/malhar/lib/join/POJOInnerJoinOperatorTest.java
new file mode 100644
index 0000000..0b31c32
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/join/POJOInnerJoinOperatorTest.java
@@ -0,0 +1,351 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.join;
+
+import java.io.IOException;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.lib.helper.OperatorContextTestHelper;
+import com.datatorrent.lib.testbench.CollectorTestSink;
+import com.datatorrent.lib.util.TestUtils;
+import com.datatorrent.stram.engine.PortContext;
+
+public class POJOInnerJoinOperatorTest
+{
+ @Rule
+ public final TestUtils.TestInfo testInfo = new TestUtils.TestInfo();
+ private static final String APPLICATION_PATH_PREFIX = "target/InnerJoinPOJOTest/";
+ private String applicationPath;
+ private Attribute.AttributeMap.DefaultAttributeMap attributes;
+ Context.OperatorContext context;
+
+ public static class Customer
+ {
+ public int ID;
+ public String Name;
+ public Customer()
+ {
+ }
+
+ public Customer(int ID, String name)
+ {
+ this.ID = ID;
+ Name = name;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "Customer{" +
+ "ID=" + ID +
+ ", Name='" + Name + '\'' +
+ '}';
+ }
+ }
+
+ public static class Order
+ {
+ public int OID;
+ public int CID;
+ public int Amount;
+
+ public Order()
+ {
+ }
+
+ public Order(int OID, int CID, int amount)
+ {
+ this.OID = OID;
+ this.CID = CID;
+ Amount = amount;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "Order{" +
+ "OID=" + OID +
+ ", CID=" + CID +
+ ", Amount=" + Amount +
+ '}';
+ }
+ }
+
+ public static class CustOrder
+ {
+ public int ID;
+ public String Name;
+ public int OID;
+ public int Amount;
+
+ public CustOrder()
+ {
+ }
+
+ @Override
+ public String toString()
+ {
+ return "{" +
+ "ID=" + ID +
+ ", Name='" + Name + '\'' +
+ ", OID=" + OID +
+ ", Amount=" + Amount +
+ '}';
+ }
+ }
+
+ @Before
+ public void beforeTest()
+ {
+ applicationPath = OperatorContextTestHelper.getUniqueApplicationPath(APPLICATION_PATH_PREFIX);
+ attributes = new Attribute.AttributeMap.DefaultAttributeMap();
+ attributes.put(DAG.APPLICATION_PATH, applicationPath);
+ context = new OperatorContextTestHelper.TestIdOperatorContext(1, attributes);
+ }
+
+ @After
+ public void afterTest()
+ {
+ Path root = new Path(applicationPath);
+ try {
+ FileSystem fs = FileSystem.newInstance(root.toUri(), new Configuration());
+ fs.delete(root, true);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Test
+ public void testInnerJoinOperator() throws IOException, InterruptedException
+ {
+ POJOInnerJoinOperator oper = new POJOInnerJoinOperator();
+ oper.setIncludeFieldStr("ID,Name;OID,Amount");
+ oper.setLeftKeyExpression("ID");
+ oper.setRightKeyExpression("CID");
+ oper.setExpiryTime(10000L);
+
+ oper.setup(context);
+ attributes.put(DAG.InputPortMeta.TUPLE_CLASS, CustOrder.class);
+ oper.outputPort.setup(new PortContext(attributes,context));
+
+ attributes.put(DAG.InputPortMeta.TUPLE_CLASS, Customer.class);
+ oper.input1.setup(new PortContext(attributes,context));
+ attributes.put(DAG.InputPortMeta.TUPLE_CLASS, Order.class);
+ oper.input2.setup(new PortContext(attributes,context));
+ oper.activate(context);
+
+ CollectorTestSink<CustOrder> sink = new CollectorTestSink<>();
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ CollectorTestSink<Object> tmp = (CollectorTestSink)sink;
+ oper.outputPort.setSink(tmp);
+ oper.beginWindow(0);
+
+ Customer tuple = new Customer(1, "Anil");
+
+ oper.input1.process(tuple);
+ Order order = new Order(102, 1, 300);
+
+ oper.input2.process(order);
+ Order order2 = new Order(103, 3, 300);
+ oper.input2.process(order2);
+ Order order3 = new Order(104, 7, 300);
+ oper.input2.process(order3);
+
+ oper.endWindow();
+
+ /* Number of tuple, emitted */
+ Assert.assertEquals("Number of tuple emitted ", 1, sink.collectedTuples.size());
+ CustOrder emitted = sink.collectedTuples.iterator().next();
+
+ Assert.assertEquals("value of ID :", tuple.ID, emitted.ID);
+ Assert.assertEquals("value of Name :", tuple.Name, emitted.Name);
+
+ Assert.assertEquals("value of OID: ", order.OID, emitted.OID);
+ Assert.assertEquals("value of Amount: ", order.Amount, emitted.Amount);
+
+ oper.teardown();
+ }
+
+ @Test
+ public void testMultipleValues() throws IOException, InterruptedException
+ {
+ POJOInnerJoinOperator oper = new POJOInnerJoinOperator();
+ oper.setIncludeFieldStr("ID,Name;OID,Amount");
+ oper.setLeftKeyExpression("ID");
+ oper.setRightKeyExpression("CID");
+ oper.setExpiryTime(10000L);
+
+ oper.setup(context);
+ attributes.put(DAG.InputPortMeta.TUPLE_CLASS, CustOrder.class);
+ oper.outputPort.setup(new PortContext(attributes,context));
+
+ attributes.put(DAG.InputPortMeta.TUPLE_CLASS, Customer.class);
+ oper.input1.setup(new PortContext(attributes,context));
+ attributes.put(DAG.InputPortMeta.TUPLE_CLASS, Order.class);
+ oper.input2.setup(new PortContext(attributes,context));
+ oper.activate(context);
+
+ CollectorTestSink<CustOrder> sink = new CollectorTestSink<>();
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ CollectorTestSink<Object> tmp = (CollectorTestSink)sink;
+ oper.outputPort.setSink(tmp);
+
+ oper.beginWindow(0);
+ Order order = new Order(102, 1, 300);
+ oper.input2.process(order);
+
+ Order order2 = new Order(103, 3, 300);
+ oper.input2.process(order2);
+ oper.endWindow();
+ oper.beginWindow(1);
+
+ Order order3 = new Order(104, 1, 300);
+ oper.input2.process(order3);
+ Customer tuple = new Customer(1, "Anil");
+ oper.input1.process(tuple);
+ oper.endWindow();
+
+ /* Number of tuple, emitted */
+ Assert.assertEquals("Number of tuple emitted ", 2, sink.collectedTuples.size());
+ CustOrder emitted = sink.collectedTuples.get(0);
+
+ Assert.assertEquals("value of ID :", tuple.ID, emitted.ID);
+ Assert.assertEquals("value of Name :", tuple.Name, emitted.Name);
+ Assert.assertEquals("value of OID: ", order.OID, emitted.OID);
+ Assert.assertEquals("value of Amount: ", order.Amount, emitted.Amount);
+
+ emitted = sink.collectedTuples.get(1);
+ Assert.assertEquals("value of ID :", tuple.ID, emitted.ID);
+ Assert.assertEquals("value of Name :", tuple.Name, emitted.Name);
+ Assert.assertEquals("value of OID: ", order3.OID, emitted.OID);
+ Assert.assertEquals("value of Amount: ", order3.Amount, emitted.Amount);
+ oper.teardown();
+ }
+
+ @Test
+ public void testUpdateStream1Values() throws IOException, InterruptedException
+ {
+ POJOInnerJoinOperator oper = new POJOInnerJoinOperator();
+ oper.setIncludeFieldStr("ID,Name;OID,Amount");
+ oper.setLeftKeyExpression("ID");
+ oper.setRightKeyExpression("CID");
+ oper.setLeftKeyPrimary(true);
+ oper.setExpiryTime(10000L);
+
+ oper.setup(context);
+ attributes.put(DAG.InputPortMeta.TUPLE_CLASS, CustOrder.class);
+ oper.outputPort.setup(new PortContext(attributes,context));
+
+ attributes.put(DAG.InputPortMeta.TUPLE_CLASS, Customer.class);
+ oper.input1.setup(new PortContext(attributes,context));
+ attributes.put(DAG.InputPortMeta.TUPLE_CLASS, Order.class);
+ oper.input2.setup(new PortContext(attributes,context));
+ oper.activate(context);
+
+ CollectorTestSink<CustOrder> sink = new CollectorTestSink<>();
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ CollectorTestSink<Object> tmp = (CollectorTestSink)sink;
+ oper.outputPort.setSink(tmp);
+
+ oper.beginWindow(0);
+ Customer tuple1 = new Customer(1, "Anil");
+ oper.input1.process(tuple1);
+ oper.endWindow();
+ oper.beginWindow(1);
+
+ Customer tuple2 = new Customer(1, "Join");
+ oper.input1.process(tuple2);
+ Order order = new Order(102, 1, 300);
+ oper.input2.process(order);
+ Order order2 = new Order(103, 3, 300);
+ oper.input2.process(order2);
+ oper.endWindow();
+
+ /* Number of tuple, emitted */
+ Assert.assertEquals("Number of tuple emitted ", 1, sink.collectedTuples.size());
+ CustOrder emitted = sink.collectedTuples.get(0);
+
+ Assert.assertEquals("value of ID :", tuple2.ID, emitted.ID);
+ Assert.assertEquals("value of Name :", tuple2.Name, emitted.Name);
+ Assert.assertEquals("value of OID: ", order.OID, emitted.OID);
+ Assert.assertEquals("value of Amount: ", order.Amount, emitted.Amount);
+ oper.teardown();
+ }
+
+ @Test
+ public void testEmitMultipleTuplesFromStream2() throws IOException, InterruptedException
+ {
+ POJOInnerJoinOperator oper = new POJOInnerJoinOperator();
+ oper.setIncludeFieldStr("ID,Name;OID,Amount");
+ oper.setLeftKeyExpression("ID");
+ oper.setRightKeyExpression("CID");
+ oper.setLeftKeyPrimary(true);
+ oper.setExpiryTime(10000L);
+
+ oper.setup(context);
+ attributes.put(DAG.InputPortMeta.TUPLE_CLASS, CustOrder.class);
+ oper.outputPort.setup(new PortContext(attributes,context));
+
+ attributes.put(DAG.InputPortMeta.TUPLE_CLASS, Customer.class);
+ oper.input1.setup(new PortContext(attributes,context));
+ attributes.put(DAG.InputPortMeta.TUPLE_CLASS, Order.class);
+ oper.input2.setup(new PortContext(attributes,context));
+ oper.activate(context);
+
+ CollectorTestSink<CustOrder> sink = new CollectorTestSink<>();
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ CollectorTestSink<Object> tmp = (CollectorTestSink)sink;
+ oper.outputPort.setSink(tmp);
+
+ oper.beginWindow(0);
+ Customer tuple1 = new Customer(1, "Anil");
+ oper.input1.process(tuple1);
+ Order order = new Order(102, 1, 300);
+ oper.input2.process(order);
+ Order order2 = new Order(103, 1, 300);
+ oper.input2.process(order2);
+ oper.endWindow();
+
+ /* Number of tuple, emitted */
+ Assert.assertEquals("Number of tuple emitted ", 2, sink.collectedTuples.size());
+ CustOrder emitted = sink.collectedTuples.get(0);
+
+ Assert.assertEquals("value of ID :", tuple1.ID, emitted.ID);
+ Assert.assertEquals("value of Name :", tuple1.Name, emitted.Name);
+ Assert.assertEquals("value of OID: ", order.OID, emitted.OID);
+ Assert.assertEquals("value of Amount: ", order.Amount, emitted.Amount);
+ emitted = sink.collectedTuples.get(1);
+ Assert.assertEquals("value of ID :", tuple1.ID, emitted.ID);
+ Assert.assertEquals("value of Name :", tuple1.Name, emitted.Name);
+ Assert.assertEquals("value of OID: ", order2.OID, emitted.OID);
+ Assert.assertEquals("value of Amount: ", order2.Amount, emitted.Amount);
+ oper.teardown();
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/fe2da3e9/library/src/test/java/org/apache/apex/malhar/lib/join/POJOPartitionJoinOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/join/POJOPartitionJoinOperatorTest.java b/library/src/test/java/org/apache/apex/malhar/lib/join/POJOPartitionJoinOperatorTest.java
new file mode 100644
index 0000000..9f6161b
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/join/POJOPartitionJoinOperatorTest.java
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.join;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.collect.Maps;
+
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.Stats;
+import com.datatorrent.api.StatsListener;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.common.partitioner.StatelessPartitioner;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
+
+public class POJOPartitionJoinOperatorTest
+{
+ public static final int NUM_OF_PARTITIONS = 4;
+ public static final int TOTAL_TUPLES_PROCESS = 1000;
+ private static boolean testFailed = false;
+
+ public static class PartitionTestJoinOperator extends POJOInnerJoinOperator implements StatsListener
+ {
+ public int operatorId;
+ HashMap<Integer, Integer> partitionMap = Maps.newHashMap();
+ transient CountDownLatch latch = new CountDownLatch(1);
+ int tuplesProcessed = 0;
+ @AutoMetric
+ int tuplesProcessedCompletely = 0;
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ super.setup(context);
+ operatorId = context.getId();
+ }
+
+ @Override
+ protected void processTuple(Object tuple, boolean isStream1Data)
+ {
+ // Verifying the data for stream1
+ if (!isStream1Data) {
+ return;
+ }
+ int key = (int)extractKey(tuple, isStream1Data);
+ if (partitionMap.containsKey(key)) {
+ if (partitionMap.get(key) != operatorId) {
+ testFailed = true;
+ }
+ } else {
+ partitionMap.put(key, operatorId);
+ }
+ tuplesProcessed++;
+ }
+
+ @Override
+ public void endWindow()
+ {
+ super.endWindow();
+ tuplesProcessedCompletely = tuplesProcessed;
+ }
+
+ @Override
+ public StatsListener.Response processStats(StatsListener.BatchedOperatorStats stats)
+ {
+ Stats.OperatorStats operatorStats = stats.getLastWindowedStats().get(stats.getLastWindowedStats().size() - 1);
+ tuplesProcessedCompletely = (Integer)operatorStats.metrics.get("tuplesProcessedCompletely");
+ if (tuplesProcessedCompletely >= TOTAL_TUPLES_PROCESS) {
+ latch.countDown();
+ }
+ return null;
+ }
+ }
+
+ public static class TestGenerator extends BaseOperator implements InputOperator
+ {
+ public final transient DefaultOutputPort<TestEvent> output = new DefaultOutputPort<>();
+ private final transient Random r = new Random();
+
+ @Override
+ public void emitTuples()
+ {
+ TestEvent event = new TestEvent();
+ event.id = r.nextInt(100);
+ output.emit(event);
+ }
+ }
+
+ public static class TestEvent
+ {
+ public int id;
+ public Date eventTime;
+
+ public TestEvent()
+ {
+ }
+
+ public int getId()
+ {
+ return id;
+ }
+
+ public void setId(int id)
+ {
+ this.id = id;
+ }
+
+ public Date getEventTime()
+ {
+ return eventTime;
+ }
+
+ public void setEventTime(Date eventTime)
+ {
+ this.eventTime = eventTime;
+ }
+ }
+
+ public static class JoinApp implements StreamingApplication
+ {
+ public PartitionTestJoinOperator joinOp;
+
+ @Override
+ public void populateDAG(DAG dag, Configuration configuration)
+ {
+ TestGenerator gen1 = dag.addOperator("Generator1", new TestGenerator());
+ TestGenerator gen2 = dag.addOperator("Generator2", new TestGenerator());
+
+ joinOp = dag.addOperator("Join", new PartitionTestJoinOperator());
+ joinOp.setLeftKeyExpression("id");
+ joinOp.setRightKeyExpression("id");
+ joinOp.setIncludeFieldStr("id,eventTime;id,eventTime");
+ joinOp.setExpiryTime(10000L);
+
+ ConsoleOutputOperator console = dag.addOperator("Console", new ConsoleOutputOperator());
+
+ dag.addStream("Gen1ToJoin", gen1.output, joinOp.input1);
+ dag.addStream("Gen2ToJoin", gen2.output, joinOp.input2);
+ dag.addStream("JoinToConsole", joinOp.outputPort, console.input);
+ dag.setInputPortAttribute(joinOp.input1, DAG.InputPortMeta.TUPLE_CLASS,TestEvent.class);
+ dag.setInputPortAttribute(joinOp.input2, DAG.InputPortMeta.TUPLE_CLASS,TestEvent.class);
+ dag.setOutputPortAttribute(joinOp.outputPort, DAG.InputPortMeta.TUPLE_CLASS,TestEvent.class);
+ dag.setAttribute(joinOp, Context.OperatorContext.PARTITIONER,
+ new StatelessPartitioner<PartitionTestJoinOperator>(NUM_OF_PARTITIONS));
+ }
+ }
+
+ /**
+ * This test validates whether a tuple key goes to exactly one partition
+ */
+ @Test
+ public void testJoinOpStreamCodec() throws Exception
+ {
+ LocalMode lma = LocalMode.newInstance();
+ Configuration conf = new Configuration(false);
+ JoinApp app = new JoinApp();
+ lma.prepareDAG(app, conf);
+ LocalMode.Controller lc = lma.getController();
+ lc.runAsync();
+ app.joinOp.latch.await();
+ Assert.assertFalse(testFailed);
+ }
+
+}
[2/2] apex-malhar git commit: Merge branch
'APEXMALHAR-2100-InnerJoin' of
https://github.com/chaithu14/incubator-apex-malhar
Posted by ch...@apache.org.
Merge branch 'APEXMALHAR-2100-InnerJoin' of https://github.com/chaithu14/incubator-apex-malhar
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/822323d0
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/822323d0
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/822323d0
Branch: refs/heads/master
Commit: 822323d02725ddf24dfe725c4a57aa05b5d4b165
Parents: e44caa5 fe2da3e
Author: Chinmay Kolhatkar <ch...@datatorrent.com>
Authored: Wed Aug 24 13:30:34 2016 +0530
Committer: Chinmay Kolhatkar <ch...@datatorrent.com>
Committed: Wed Aug 24 13:30:34 2016 +0530
----------------------------------------------------------------------
.../lib/join/AbstractInnerJoinOperator.java | 340 ++++++++++++++++++
.../AbstractManagedStateInnerJoinOperator.java | 253 +++++++++++++
.../apex/malhar/lib/join/JoinStreamCodec.java | 46 +++
.../malhar/lib/join/POJOInnerJoinOperator.java | 246 +++++++++++++
.../state/managed/AbstractManagedStateImpl.java | 4 +-
.../managed/ManagedTimeStateMultiValue.java | 353 +++++++++++++++++++
.../lib/join/POJOInnerJoinOperatorTest.java | 351 ++++++++++++++++++
.../lib/join/POJOPartitionJoinOperatorTest.java | 194 ++++++++++
8 files changed, 1786 insertions(+), 1 deletion(-)
----------------------------------------------------------------------