You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by cs...@apache.org on 2016/02/27 00:30:29 UTC
[1/2] incubator-apex-malhar git commit: APEXMALHAR-1720 Implemented
Inmemory Join Operator. Supported Inner, LeftOuter,
RightOuter and FullOuter join types
Repository: incubator-apex-malhar
Updated Branches:
refs/heads/devel-3 8331f56da -> fd2f42bd9
APEXMALHAR-1720 Implemented Inmemory Join Operator. Supported Inner, LeftOuter, RightOuter and FullOuter join types
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/fc547d87
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/fc547d87
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/fc547d87
Branch: refs/heads/devel-3
Commit: fc547d871da868c73ed5554f6a26708916202bae
Parents: b7199b7
Author: Chaitanya <ch...@datatorrent.com>
Authored: Thu Jan 7 22:19:06 2016 +0530
Committer: Chaitanya <ch...@datatorrent.com>
Committed: Thu Jan 7 22:19:06 2016 +0530
----------------------------------------------------------------------
.../lib/join/AbstractJoinOperator.java | 435 +++++++++++++++++++
.../java/com/datatorrent/lib/join/Bucket.java | 91 ++++
.../com/datatorrent/lib/join/InMemoryStore.java | 107 +++++
.../com/datatorrent/lib/join/JoinStore.java | 91 ++++
.../datatorrent/lib/join/MapJoinOperator.java | 100 +++++
.../datatorrent/lib/join/POJOJoinOperator.java | 266 ++++++++++++
.../datatorrent/lib/join/TimeBasedStore.java | 333 ++++++++++++++
.../com/datatorrent/lib/join/TimeEvent.java | 50 +++
.../com/datatorrent/lib/join/TimeEventImpl.java | 121 ++++++
.../lib/join/MapTimeBasedJoinOperator.java | 118 +++++
.../lib/join/POJOTimeBasedJoinOperatorTest.java | 385 ++++++++++++++++
11 files changed, 2097 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/fc547d87/library/src/main/java/com/datatorrent/lib/join/AbstractJoinOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/join/AbstractJoinOperator.java b/library/src/main/java/com/datatorrent/lib/join/AbstractJoinOperator.java
new file mode 100644
index 0000000..a3f43b5
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/join/AbstractJoinOperator.java
@@ -0,0 +1,435 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.join;
+
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.List;
+
+import javax.validation.constraints.NotNull;
+
+import org.apache.hadoop.classification.InterfaceStability;
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+
+/**
+ * <p>
+ * This is the base implementation of join operator. Operator receives tuples from two streams,
+ * applies the join operation based on constraint and emit the joined value.
+ * Subclasses should provide implementation to createOutputTuple,copyValue, getKeyValue, getTime methods.
+ *
+ * <b>Properties:</b><br>
+ * <b>expiryTime</b>: Expiry time for stored tuples<br>
+ * <b>includeFieldStr</b>: List of comma separated fields to be added to the output tuple.
+ * Ex: Field1,Field2;Field3,Field4<br>
+ * <b>keyFields</b>: List of comma separated key field for both the streams. Ex: Field1,Field2<br>
+ * <b>timeFields</b>: List of comma separated time field for both the streams. Ex: Field1,Field2<br>
+ * <b>bucketSpanInMillis</b>: Span of each bucket in milliseconds.<br>
+ * <b>strategy</b>: Type of join operation. Default type is inner join<br>
+ * <br>
+ *
+ * <b> Example: </b> <br>
+ * Left input port receives customer details and right input port receives Order details.
+ * Schema for the Customer be in the form of
+ * Schema for the Order be in the form of
+ * Now, Join the tuples of Customer and Order streams where Customer.ID = Order.CID and the constraint is
+ * matched tuples must have timestamp within 5 minutes.
+ * Here, key Fields = ID, CID and Time Fields = RTime, OTime, expiryTime = 5 minutes </b> <br>
+ *
+ *
+ * @displayName Abstract Join Operator
+ * @tags join
+ */
+@InterfaceStability.Unstable
+public abstract class AbstractJoinOperator<T> extends BaseOperator implements Operator.CheckpointListener
+{
+ @AutoMetric
+ private long tuplesJoinedPerSec;
+ private double windowTimeSec;
+ protected int tuplesCount;
+ public final transient DefaultOutputPort<List<T>> outputPort = new DefaultOutputPort<>();
+
+ // Strategy of Join operation, by default the option is inner join
+ protected JoinStrategy strategy = JoinStrategy.INNER_JOIN;
+ // This represents whether the processing tuple is from left port or not
+ protected boolean isLeft;
+
+ @InputPortFieldAnnotation
+ public transient DefaultInputPort<T> input1 = new DefaultInputPort<T>()
+ {
+ @Override
+ public void process(T tuple)
+ {
+ isLeft = true;
+ processTuple(tuple);
+ }
+ };
+ @InputPortFieldAnnotation
+ public transient DefaultInputPort<T> input2 = new DefaultInputPort<T>()
+ {
+ @Override
+ public void process(T tuple)
+ {
+ isLeft = false;
+ processTuple(tuple);
+ }
+ };
+
+ // Stores for each of the input port
+ @NotNull
+ protected StoreContext leftStore;
+ @NotNull
+ protected StoreContext rightStore;
+ private String includeFieldStr;
+ private String keyFieldStr;
+ private String timeFieldStr;
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ // Checks whether the strategy is outer join and set it to store
+ boolean isOuter = strategy.equals(JoinStrategy.LEFT_OUTER_JOIN) || strategy.equals(JoinStrategy.OUTER_JOIN);
+ leftStore.getStore().isOuterJoin(isOuter);
+ isOuter = strategy.equals(JoinStrategy.RIGHT_OUTER_JOIN) || strategy.equals(JoinStrategy.OUTER_JOIN);
+ rightStore.getStore().isOuterJoin(isOuter);
+ // Setup the stores
+ leftStore.getStore().setup(context);
+ rightStore.getStore().setup(context);
+ populateFields();
+ windowTimeSec = (context.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT) *
+ context.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS) * 1.0) / 1000.0;
+ }
+
+ /**
+ * Create the event with the given tuple. If it successfully inserted it into the store
+ * then it does the join operation
+ *
+ * @param tuple Tuple to process
+ */
+ protected void processTuple(T tuple)
+ {
+ JoinStore store = isLeft ? leftStore.getStore() : rightStore.getStore();
+ TimeEvent t = createEvent(tuple);
+ if (store.put(t)) {
+ join(t, isLeft);
+ }
+ }
+
+ private void populateFields()
+ {
+ populateIncludeFields();
+ populateKeyFields();
+ if (timeFieldStr != null) {
+ populateTimeFields();
+ }
+ }
+
+ /**
+ * Populate the fields from the includeFiledStr
+ */
+ private void populateIncludeFields()
+ {
+ String[] portFields = includeFieldStr.split(";");
+ assert (portFields.length == 2);
+ leftStore.setIncludeFields(portFields[0].split(","));
+ rightStore.setIncludeFields(portFields[1].split(","));
+ }
+
+ /**
+ * Get the tuples from another store based on join constraint and key
+ *
+ * @param tuple input
+ * @param isLeft whether the given tuple is from first port or not
+ */
+ private void join(TimeEvent tuple, boolean isLeft)
+ {
+ // Get the valid tuples from the store based on key
+ // If the tuple is null means the join type is outer and return unmatched tuples from store.
+
+ ArrayList<TimeEvent> value;
+ JoinStore store = isLeft ? rightStore.getStore() : leftStore.getStore();
+
+ if (tuple != null) {
+ value = (ArrayList<TimeEvent>)store.getValidTuples(tuple);
+ } else {
+ value = (ArrayList<TimeEvent>)store.getUnMatchedTuples();
+ }
+
+ // Join the input tuple with the joined tuples
+ if (value != null) {
+ List<T> result = new ArrayList<>();
+ for (TimeEvent joinedValue : value) {
+ T output = createOutputTuple();
+ Object tupleValue = null;
+ if (tuple != null) {
+ tupleValue = tuple.getValue();
+ }
+ copyValue(output, tupleValue, isLeft);
+ copyValue(output, joinedValue.getValue(), !isLeft);
+ result.add(output);
+ joinedValue.setMatch(true);
+ }
+ if (tuple != null) {
+ tuple.setMatch(true);
+ }
+ if (result.size() != 0) {
+ outputPort.emit(result);
+ tuplesCount += result.size();
+ }
+ }
+ }
+
+ // Emit the unmatched tuples, if the strategy is outer join
+ @Override
+ public void endWindow()
+ {
+ if (strategy.equals(JoinStrategy.LEFT_OUTER_JOIN) || strategy.equals(JoinStrategy.OUTER_JOIN)) {
+ join(null, false);
+ }
+ if (strategy.equals(JoinStrategy.RIGHT_OUTER_JOIN) || strategy.equals(JoinStrategy.OUTER_JOIN)) {
+ join(null, true);
+ }
+ leftStore.getStore().endWindow();
+ rightStore.getStore().endWindow();
+ tuplesJoinedPerSec = (long)(tuplesCount / windowTimeSec);
+ }
+
+ @Override
+ public void beginWindow(long windowId)
+ {
+ super.beginWindow(windowId);
+ tuplesJoinedPerSec = 0;
+ tuplesCount = 0;
+ }
+
+ @Override
+ public void checkpointed(long windowId)
+ {
+ leftStore.getStore().checkpointed(windowId);
+ rightStore.getStore().checkpointed(windowId);
+ }
+
+ @Override
+ public void committed(long windowId)
+ {
+ leftStore.getStore().committed(windowId);
+ rightStore.getStore().committed(windowId);
+ }
+
+ /**
+ * Convert the given tuple to event
+ *
+ * @param tuple Given tuple to convert into event
+ * @return event
+ */
+ protected TimeEvent createEvent(Object tuple)
+ {
+ String key = leftStore.getKeys();
+ String timeField = leftStore.getTimeFields();
+ if (!isLeft) {
+ key = rightStore.getKeys();
+ timeField = rightStore.getTimeFields();
+ }
+ if (timeField != null) {
+ return new TimeEventImpl(getKeyValue(key, tuple), (Long)getTime(timeField, tuple), tuple);
+ } else {
+ return new TimeEventImpl(getKeyValue(key, tuple), Calendar.getInstance().getTimeInMillis(), tuple);
+ }
+ }
+
+ private void populateKeyFields()
+ {
+ leftStore.setKeys(keyFieldStr.split(",")[0]);
+ rightStore.setKeys(keyFieldStr.split(",")[1]);
+ }
+
+ public JoinStrategy getStrategy()
+ {
+ return strategy;
+ }
+
+ public void setStrategy(JoinStrategy strategy)
+ {
+ this.strategy = strategy;
+ }
+
+ public void setLeftStore(@NotNull JoinStore lStore)
+ {
+ leftStore = new StoreContext(lStore);
+ }
+
+ public void setRightStore(@NotNull JoinStore rStore)
+ {
+ rightStore = new StoreContext(rStore);
+ }
+
+ public void setKeyFields(String keyFieldStr)
+ {
+ this.keyFieldStr = keyFieldStr;
+ }
+
+ public void setTimeFieldStr(String timeFieldStr)
+ {
+ this.timeFieldStr = timeFieldStr;
+ }
+
+ public void setIncludeFields(String includeFieldStr)
+ {
+ this.includeFieldStr = includeFieldStr;
+ }
+
+ public StoreContext getLeftStore()
+ {
+ return leftStore;
+ }
+
+ public StoreContext getRightStore()
+ {
+ return rightStore;
+ }
+
+ public String getIncludeFieldStr()
+ {
+ return includeFieldStr;
+ }
+
+ public String getKeyFieldStr()
+ {
+ return keyFieldStr;
+ }
+
+ public String getTimeFieldStr()
+ {
+ return timeFieldStr;
+ }
+
+ /**
+ * Specify the comma separated time fields for both steams
+ */
+ private void populateTimeFields()
+ {
+ leftStore.setTimeFields(timeFieldStr.split(",")[0]);
+ rightStore.setTimeFields(timeFieldStr.split(",")[1]);
+ }
+
+ public void setStrategy(String policy)
+ {
+ this.strategy = JoinStrategy.valueOf(policy.toUpperCase());
+ }
+
+ /**
+ * Create the output object
+ *
+ * @return output tuple
+ */
+ protected abstract T createOutputTuple();
+
+ /**
+ * Get the values from extractTuple and set these values to the output
+ *
+ * @param output otuput tuple
+ * @param extractTuple Extract the values from this tuple
+ * @param isLeft Whether the extracted tuple belongs to left stream or not
+ */
+ protected abstract void copyValue(T output, Object extractTuple, boolean isLeft);
+
+ /**
+ * Get the value of the key field from the given tuple
+ *
+ * @param keyField Value of the field to extract from given tuple
+ * @param tuple Given tuple
+ * @return the value of field from given tuple
+ */
+ protected abstract Object getKeyValue(String keyField, Object tuple);
+
+ /**
+ * Get the value of the time field from the given tuple
+ *
+ * @param field Time field
+ * @param tuple given tuple
+ * @return the value of time field from given tuple
+ */
+ protected abstract Object getTime(String field, Object tuple);
+
+ public static enum JoinStrategy
+ {
+ INNER_JOIN,
+ LEFT_OUTER_JOIN,
+ RIGHT_OUTER_JOIN,
+ OUTER_JOIN
+ }
+
+ public static class StoreContext
+ {
+ private transient String timeFields;
+ private transient String[] includeFields;
+ private transient String keys;
+ private JoinStore store;
+
+ public StoreContext(JoinStore store)
+ {
+ this.store = store;
+ }
+
+ public String getTimeFields()
+ {
+ return timeFields;
+ }
+
+ public void setTimeFields(String timeFields)
+ {
+ this.timeFields = timeFields;
+ }
+
+ public String[] getIncludeFields()
+ {
+ return includeFields;
+ }
+
+ public void setIncludeFields(String[] includeFields)
+ {
+ this.includeFields = includeFields;
+ }
+
+ public String getKeys()
+ {
+ return keys;
+ }
+
+ public void setKeys(String keys)
+ {
+ this.keys = keys;
+ }
+
+ public JoinStore getStore()
+ {
+ return store;
+ }
+
+ public void setStore(JoinStore store)
+ {
+ this.store = store;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/fc547d87/library/src/main/java/com/datatorrent/lib/join/Bucket.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/join/Bucket.java b/library/src/main/java/com/datatorrent/lib/join/Bucket.java
new file mode 100644
index 0000000..13ea496
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/join/Bucket.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.join;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+/**
+ * <p>
+ * This is the base implementation of bucket which contains all the events which belong to the same bucket.
+ * </p>
+ *
+ * @param <T> type of bucket events
+ */
+@InterfaceStability.Unstable
+public class Bucket<T extends TimeEvent>
+{
+ public final long bucketKey;
+ protected Map<Object, List<T>> unwrittenEvents;
+
+ public Bucket()
+ {
+ bucketKey = -1L;
+ }
+
+ protected Bucket(long bucketKey)
+ {
+ this.bucketKey = bucketKey;
+ }
+
+ /**
+ * Add the given event into the unwritternEvents map
+ *
+ * @param eventKey event key
+ * @param event Given key
+ */
+ protected void addNewEvent(Object eventKey, T event)
+ {
+ if (unwrittenEvents == null) {
+ unwrittenEvents = Maps.newHashMap();
+ }
+ List<T> listEvents = unwrittenEvents.get(eventKey);
+ if (listEvents == null) {
+ unwrittenEvents.put(eventKey, Lists.newArrayList(event));
+ } else {
+ listEvents.add(event);
+ }
+ }
+
+ /**
+ * Return the unwritten events in the bucket
+ *
+ * @return the unwritten events
+ */
+ public Map<Object, List<T>> getEvents()
+ {
+ return unwrittenEvents;
+ }
+
+ /**
+ * Return the list of events for the given key
+ *
+ * @param key given key
+ * @return the list of events
+ */
+ public List<T> get(Object key)
+ {
+ return unwrittenEvents.get(key);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/fc547d87/library/src/main/java/com/datatorrent/lib/join/InMemoryStore.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/join/InMemoryStore.java b/library/src/main/java/com/datatorrent/lib/join/InMemoryStore.java
new file mode 100644
index 0000000..7161faa
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/join/InMemoryStore.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.join;
+
+import java.util.List;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.datatorrent.api.Context;
+
+/**
+ * Wrapper class for TimeBased Store.
+ */
+@InterfaceStability.Unstable
+public class InMemoryStore extends TimeBasedStore<TimeEvent> implements JoinStore
+{
+ public InMemoryStore()
+ {
+ }
+
+ public InMemoryStore(long spanTimeInMillis, int bucketSpanInMillis)
+ {
+ super();
+ setSpanTimeInMillis(spanTimeInMillis);
+ setBucketSpanInMillis(bucketSpanInMillis);
+ }
+
+ @Override
+ public void committed(long windowId)
+ {
+
+ }
+
+ @Override
+ public void checkpointed(long windowId)
+ {
+
+ }
+
+ @Override
+ public void beginWindow(long windowId)
+ {
+
+ }
+
+ @Override
+ public void endWindow()
+ {
+
+ }
+
+ @Override
+ public List<TimeEvent> getUnMatchedTuples()
+ {
+ return super.getUnmatchedEvents();
+ }
+
+ @Override
+ public void isOuterJoin(boolean isOuter)
+ {
+ super.isOuterJoin(isOuter);
+ }
+
+ @Override
+ public List<TimeEvent> getValidTuples(Object tuple)
+ {
+ return super.getValidTuples((TimeEvent)tuple);
+ }
+
+ @Override
+ public boolean put(Object tuple)
+ {
+ return super.put((TimeEvent)tuple);
+ }
+
+ private static final Logger logger = LoggerFactory.getLogger(InMemoryStore.class);
+
+ @Override
+ public void setup(Context context)
+ {
+ super.setup();
+ }
+
+ @Override
+ public void teardown()
+ {
+ super.shutdown();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/fc547d87/library/src/main/java/com/datatorrent/lib/join/JoinStore.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/join/JoinStore.java b/library/src/main/java/com/datatorrent/lib/join/JoinStore.java
new file mode 100644
index 0000000..7b95acc
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/join/JoinStore.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.join;
+
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.datatorrent.api.Component;
+
+/**
+ * <p>
+ * Interface of store for join operation.
+ * </p>
+ */
+@InterfaceStability.Unstable
+public interface JoinStore extends Component
+{
+ /**
+ * Generate the store
+ */
+
+ /**
+ * Perform the committed operation
+ * @param windowId
+ */
+ void committed(long windowId);
+
+ /**
+ * Save the state of store
+ * @param windowId
+ */
+ void checkpointed(long windowId);
+
+ /**
+ * Add the operations, any needed for store before begin the window
+ * @param windowId
+ */
+ void beginWindow(long windowId);
+
+ /**
+ *
+ */
+ void endWindow();
+
+ /**
+ * Get the key from the given tuple and with that key, get the tuples which satisfies the join constraint
+ * from the store.
+ *
+ * @param tuple Given tuple
+ * @return the valid tuples which statisfies the join constraint
+ */
+ List<?> getValidTuples(Object tuple);
+
+ /**
+ * Insert the given tuple
+ *
+ * @param tuple Given tuple
+ */
+ boolean put(Object tuple);
+
+ /**
+ * Return the unmatched events from store
+ *
+ * @return the unmatched events
+ */
+ List<?> getUnMatchedTuples();
+
+ /**
+ * Set if the join type is outer
+ *
+ * @param isOuter Specifies the join type is outer join or not
+ */
+ void isOuterJoin(boolean isOuter);
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/fc547d87/library/src/main/java/com/datatorrent/lib/join/MapJoinOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/join/MapJoinOperator.java b/library/src/main/java/com/datatorrent/lib/join/MapJoinOperator.java
new file mode 100644
index 0000000..3e23f73
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/join/MapJoinOperator.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.join;
+
+import java.util.Calendar;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * This class takes a HashMap tuple as input from each of the input port. Operator joines the input tuples
+ * based on join constraint and emit the result.
+ *
+ * <br>
+ * <b>Ports : </b> <br>
+ * <b> input1 : </b> Input port for stream 1, expects HashMap<String, Object> <br>
+ * <b> input2 : </b> Input port for stream 2, expects HashMap<String, Object> <br>
+ * <b> outputPort: </b> Output port emits ArrayList<HashMap<String, Object>> <br>
+ * <br>
+ * <b>Example:</b>
+ * Input tuple from port1 is
+ * {timestamp = 5000, productId = 3, customerId = 108, regionId = 4, amount = $560 }
+ *
+ * Input tuple from port2 is
+ * { timestamp = 5500, productCategory = 8, productId=3 }
+ *
+ * <b>Properties: </b>
+ * <b>expiryTime</b>: 1000<br>
+ * <b>includeFieldStr</b>: timestamp, customerId, amount; productCategory, productId<br>
+ * <b>keyFields</b>: productId, productId<br>
+ * <b>timeFields</b>: timestamp, timestamp<br>
+ * <b>bucketSpanInMillis</b>: 500<br>
+ *
+ * <b>Output</b>
+ * { timestamp = 5000, customerId = 108, amount = $560, productCategory = 8, productId=3}
+ *
+ * @displayName MapJoin Operator
+ * @category join
+ * @tags join
+ */
+@InterfaceStability.Unstable
+public class MapJoinOperator extends AbstractJoinOperator<Map<String, Object>>
+{
+ @Override
+ protected Map<String, Object> createOutputTuple()
+ {
+ return new HashMap<String, Object>();
+ }
+
+ @Override
+ protected void copyValue(Map<String, Object> output, Object extractTuple, boolean isLeft)
+ {
+ String[] fields;
+ if (isLeft) {
+ fields = leftStore.getIncludeFields();
+ } else {
+ fields = rightStore.getIncludeFields();
+ }
+ for (int i = 0; i < fields.length; i++) {
+ Object value = null;
+ if (extractTuple != null) {
+ value = ((Map<String, Object>)extractTuple).get(fields[i]);
+ }
+ output.put(fields[i], value);
+ }
+ }
+
+ public Object getKeyValue(String keyField, Object tuple)
+ {
+ Map<String, Object> o = (Map<String, Object>)tuple;
+ return o.get(keyField);
+ }
+
+ @Override
+ protected Object getTime(String field, Object tuple)
+ {
+ if (getTimeFieldStr() != null) {
+ Map<String, Object> o = (Map<String, Object>)tuple;
+ return o.get(field);
+ }
+ return Calendar.getInstance().getTimeInMillis();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/fc547d87/library/src/main/java/com/datatorrent/lib/join/POJOJoinOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/join/POJOJoinOperator.java b/library/src/main/java/com/datatorrent/lib/join/POJOJoinOperator.java
new file mode 100644
index 0000000..34101ff
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/join/POJOJoinOperator.java
@@ -0,0 +1,266 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.join;
+
+import java.lang.reflect.Array;
+import java.lang.reflect.Field;
+import java.util.Calendar;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.commons.lang3.ClassUtils;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.datatorrent.lib.util.PojoUtils;
+
+/**
+ * This class takes a POJO as input from each of the input port. Operator joines the input tuples
+ * based on join constraint and emit the result.
+ *
+ * <br>
+ * <b>Ports : </b> <br>
+ * <b> input1 : </b> Input port for stream 1, expects POJO <br>
+ * <b> input2 : </b> Input port for stream 2, expects POJO <br>
+ * <b> outputPort: </b> Output port emits POJO <br>
+ * <br>
+ * <b>Example:</b>
+ * Input tuple from port1 is
+ * {timestamp = 5000, productId = 3, customerId = 108, regionId = 4, amount = $560 }
+ *
+ * Input tuple from port2 is
+ * { timestamp = 5500, productCategory = 8, productId=3 }
+ *
+ * <b>Properties: </b>
+ * <b>expiryTime</b>: 1000<br>
+ * <b>includeFieldStr</b>: timestamp, customerId, amount; productCategory, productId<br>
+ * <b>keyFields</b>: productId, productId<br>
+ * <b>timeFields</b>: timestamp, timestamp<br>
+ * <b>bucketSpanInMillis</b>: 500<br>
+ *
+ * <b>Output</b>
+ * { timestamp = 5000, customerId = 108, amount = $560, productCategory = 8, productId=3}
+ *
+ * @displayName BeanJoin Operator
+ * @category join
+ * @tags join
+ */
+@InterfaceStability.Unstable
+public class POJOJoinOperator extends AbstractJoinOperator
+{
+ protected Class outputClass;
+ protected transient Class leftClass;
+ protected transient Class rightClass;
+ private String outputClassStr;
+ private transient List<FieldObjectMap>[] fieldMap = (List<FieldObjectMap>[])Array.newInstance(
+ (new LinkedList<FieldObjectMap>()).getClass(), 2);
+ private transient PojoUtils.Getter[] keyGetters = (PojoUtils.Getter[])Array.newInstance(PojoUtils.Getter.class, 2);
+ private transient PojoUtils.Getter[] timeGetters = (PojoUtils.Getter[])Array.newInstance(PojoUtils.Getter.class, 2);
+
+ // Populate the getters from the input tuple
+ @Override
+ protected void processTuple(Object tuple)
+ {
+ setAndPopulateGetters(tuple, isLeft);
+ super.processTuple(tuple);
+ }
+
+ /**
+ * Populate the class and getters from the given tuple
+ *
+ * @param tuple Given tuple
+ * @param isLeft Whether the given tuple belongs to left stream or not
+ */
+ private void setAndPopulateGetters(Object tuple, boolean isLeft)
+ {
+ if (isLeft && leftClass == null) {
+ leftClass = tuple.getClass();
+ populateGettersFromInput(isLeft);
+ }
+ if (!isLeft && rightClass == null) {
+ rightClass = tuple.getClass();
+ populateGettersFromInput(isLeft);
+ }
+ }
+
+ /**
+ * Populate the getters from the input class
+ *
+ * @param isLeft isLeft specifies whether the class is left or right
+ */
+ private void populateGettersFromInput(boolean isLeft)
+ {
+ Class inputClass;
+ int idx;
+ StoreContext store;
+ if (isLeft) {
+ idx = 0;
+ inputClass = leftClass;
+ store = leftStore;
+ } else {
+ idx = 1;
+ inputClass = rightClass;
+ store = rightStore;
+ }
+ String key = store.getKeys();
+ String timeField = store.getTimeFields();
+ String[] fields = store.getIncludeFields();
+
+ // Create getter for the key field
+ try {
+ Class c = ClassUtils.primitiveToWrapper(inputClass.getField(key).getType());
+ keyGetters[idx] = PojoUtils.createGetter(inputClass, key, c);
+ } catch (NoSuchFieldException e) {
+ throw new RuntimeException(e);
+ }
+
+ // Create getter for time field
+ if (timeField != null) {
+ try {
+ Class c = ClassUtils.primitiveToWrapper(inputClass.getField(timeField).getType());
+ timeGetters[idx] = PojoUtils.createGetter(inputClass, timeField, c);
+ } catch (NoSuchFieldException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ fieldMap[idx] = new LinkedList<FieldObjectMap>();
+ List<FieldObjectMap> fieldsMap = fieldMap[idx];
+ // Create getters for the include fields
+ for (String f : fields) {
+ try {
+ Field field = inputClass.getField(f);
+ Class c;
+ if (field.getType().isPrimitive()) {
+ c = ClassUtils.primitiveToWrapper(field.getType());
+ } else {
+ c = field.getType();
+ }
+ FieldObjectMap fm = new FieldObjectMap();
+ fm.get = PojoUtils.createGetter(inputClass, f, c);
+ fm.set = PojoUtils.createSetter(outputClass, f, c);
+ fieldsMap.add(fm);
+ } catch (Throwable e) {
+ throw new RuntimeException("Failed to populate gettter for field: " + f, e);
+ }
+ }
+ }
+
+ /**
+ * Create the output class object
+ *
+ * @return the new output object
+ */
+ @Override
+ protected Object createOutputTuple()
+ {
+ try {
+ return outputClass.newInstance();
+ } catch (InstantiationException e) {
+ throw new RuntimeException(e);
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Copy the field values of extractTuple to output object
+ *
+ * @param output
+ * @param extractTuple
+ * @param isLeft
+ */
+ @Override
+ protected void copyValue(Object output, Object extractTuple, boolean isLeft)
+ {
+ if (extractTuple == null) {
+ return;
+ }
+
+ setAndPopulateGetters(extractTuple, isLeft);
+
+ List<FieldObjectMap> fieldsMap;
+ if (isLeft) {
+ fieldsMap = fieldMap[0];
+ } else {
+ fieldsMap = fieldMap[1];
+ }
+
+ for (FieldObjectMap map : fieldsMap) {
+ map.set.set(output, map.get.get(extractTuple));
+ }
+ }
+
+ /**
+ * Return the keyField value of tuple object
+ *
+ * @param keyField
+ * @param tuple
+ * @return the tuple value for the given keyfield
+ */
+ public Object getKeyValue(String keyField, Object tuple)
+ {
+ if (isLeft) {
+ return keyGetters[0].get(tuple);
+ }
+ return keyGetters[1].get(tuple);
+ }
+
+ @Override
+ protected Object getTime(String field, Object tuple)
+ {
+ if (getTimeFieldStr() != null) {
+ if (isLeft) {
+ return timeGetters[0].get(tuple);
+ }
+ return timeGetters[1].get(tuple);
+ }
+ return Calendar.getInstance().getTimeInMillis();
+ }
+
+ public void populateOutputClass()
+ {
+ try {
+ this.outputClass = this.getClass().getClassLoader().loadClass(outputClassStr);
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public String getOutputClass()
+ {
+ return outputClassStr;
+ }
+
+ /**
+ * Load the output class
+ *
+ * @param outputClassStr
+ */
+ public void setOutputClass(String outputClassStr)
+ {
+ this.outputClassStr = outputClassStr;
+ populateOutputClass();
+ }
+
+ private class FieldObjectMap
+ {
+ public PojoUtils.Getter get;
+ public PojoUtils.Setter set;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/fc547d87/library/src/main/java/com/datatorrent/lib/join/TimeBasedStore.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/join/TimeBasedStore.java b/library/src/main/java/com/datatorrent/lib/join/TimeBasedStore.java
new file mode 100644
index 0000000..88597f3
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/join/TimeBasedStore.java
@@ -0,0 +1,333 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.join;
+
+import java.lang.reflect.Array;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+
+import javax.validation.constraints.Min;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Base implementation of time based store for key-value pair tuples.
+ *
+ * @param <T>
+ */
+@InterfaceStability.Unstable
+public class TimeBasedStore<T extends TimeEvent>
+{
+ private static final Logger logger = LoggerFactory.getLogger(TimeBasedStore.class);
+
+ private final transient Lock lock;
+ @Min(1)
+ protected int noOfBuckets;
+ protected Bucket<T>[] buckets;
+ @Min(1)
+ protected long expiryTimeInMillis;
+ @Min(1)
+ protected long spanTimeInMillis;
+ protected int bucketSpanInMillis;
+ protected long startOfBucketsInMillis;
+ protected long endOBucketsInMillis;
+ protected transient Map<Long, Bucket> dirtyBuckets = new HashMap<Long, Bucket>();
+ private boolean isOuter = false;
+ private List<T> unmatchedEvents = new ArrayList<T>();
+ private Map<Object, Set<Long>> key2Buckets = new ConcurrentHashMap<Object, Set<Long>>();
+ private transient Timer bucketSlidingTimer;
+
+ public TimeBasedStore()
+ {
+ lock = new Lock();
+ }
+
+ /**
+ * Compute the number of buckets based on spantime and bucketSpanInMillis
+ */
+ private void recomputeNumBuckets()
+ {
+ Calendar calendar = Calendar.getInstance();
+ long now = calendar.getTimeInMillis();
+ startOfBucketsInMillis = now - spanTimeInMillis;
+ expiryTimeInMillis = startOfBucketsInMillis;
+ endOBucketsInMillis = now;
+ noOfBuckets = (int)Math.ceil((now - startOfBucketsInMillis) / (bucketSpanInMillis * 1.0));
+ buckets = (Bucket<T>[])Array.newInstance(Bucket.class, noOfBuckets);
+ }
+
+ /**
+ * Compute the buckets and start the service
+ */
+ public void setup()
+ {
+ setBucketSpanInMillis((int)(spanTimeInMillis > (long)bucketSpanInMillis ? bucketSpanInMillis : spanTimeInMillis));
+ if (buckets == null) {
+ recomputeNumBuckets();
+ }
+ startService();
+ }
+
+ /**
+ * Return the tuples which satisfies the join constraint
+ *
+ * @param tuple
+ * @return the list of events
+ */
+ public List<TimeEvent> getValidTuples(T tuple)
+ {
+ // Get the key from the given tuple
+ Object key = tuple.getEventKey();
+ // Get the buckets where the key is present
+ Set<Long> keyBuckets = key2Buckets.get(key);
+ if (keyBuckets == null) {
+ return null;
+ }
+ List<TimeEvent> validTuples = new ArrayList<TimeEvent>();
+ for (Long idx : keyBuckets) {
+ int bucketIdx = (int)(idx % noOfBuckets);
+ Bucket tb = buckets[bucketIdx];
+ if (tb == null || tb.bucketKey != idx) {
+ continue;
+ }
+ List<T> events = tb.get(key);
+ if (events != null) {
+ validTuples.addAll(events);
+ }
+ }
+ return validTuples;
+ }
+
+ /**
+ * Insert the given tuple into the bucket
+ *
+ * @param tuple
+ */
+ public boolean put(T tuple)
+ {
+ long bucketKey = getBucketKeyFor(tuple);
+ if (bucketKey < 0) {
+ return false;
+ }
+ newEvent(bucketKey, tuple);
+ return true;
+ }
+
+ /**
+ * Calculates the bucket key for the given event
+ *
+ * @param event
+ * @return the bucket key
+ */
+ public long getBucketKeyFor(T event)
+ {
+ long eventTime = event.getTime();
+ // Negative indicates the invalid events
+ if (eventTime < expiryTimeInMillis) {
+ return -1;
+ }
+ long diffFromStart = eventTime - startOfBucketsInMillis;
+ long key = diffFromStart / bucketSpanInMillis;
+ synchronized (lock) {
+ if (eventTime > endOBucketsInMillis) {
+ long move = ((eventTime - endOBucketsInMillis) / bucketSpanInMillis + 1) * bucketSpanInMillis;
+ expiryTimeInMillis += move;
+ endOBucketsInMillis += move;
+ }
+ }
+ return key;
+ }
+
+ /**
+ * Insert the event into the specified bucketKey
+ *
+ * @param bucketKey
+ * @param event
+ */
+ public void newEvent(long bucketKey, T event)
+ {
+ int bucketIdx = (int)(bucketKey % noOfBuckets);
+
+ Bucket<T> bucket = buckets[bucketIdx];
+
+ if (bucket == null || bucket.bucketKey != bucketKey) {
+ // If the bucket is already present then the bucket is expirable
+ if (bucket != null) {
+ dirtyBuckets.put(bucket.bucketKey, bucket);
+ }
+ bucket = createBucket(bucketKey);
+ buckets[bucketIdx] = bucket;
+ }
+
+ // Insert the key into the key2Buckets map
+ Object key = event.getEventKey();
+ Set<Long> keyBuckets = key2Buckets.get(key);
+ if (keyBuckets == null) {
+ keyBuckets = new HashSet<Long>();
+ keyBuckets.add(bucketKey);
+ key2Buckets.put(key, keyBuckets);
+ } else {
+ keyBuckets.add(bucketKey);
+ }
+ bucket.addNewEvent(key, event);
+ }
+
+ /**
+ * Delete the expired buckets at every bucketSpanInMillis periodically
+ */
+ public void startService()
+ {
+ bucketSlidingTimer = new Timer();
+ endOBucketsInMillis = expiryTimeInMillis + (noOfBuckets * bucketSpanInMillis);
+ logger.debug("bucket properties {}, {}", spanTimeInMillis, bucketSpanInMillis);
+ logger.debug("bucket time params: start {}, end {}", startOfBucketsInMillis, endOBucketsInMillis);
+
+ bucketSlidingTimer.scheduleAtFixedRate(new TimerTask()
+ {
+ @Override
+ public void run()
+ {
+ long time = 0;
+ synchronized (lock) {
+ time = (expiryTimeInMillis += bucketSpanInMillis);
+ endOBucketsInMillis += bucketSpanInMillis;
+ }
+ deleteExpiredBuckets(time);
+ }
+ }, bucketSpanInMillis, bucketSpanInMillis);
+ }
+
+ /**
+ * Remove the expired buckets.
+ *
+ * @param time
+ */
+ void deleteExpiredBuckets(long time)
+ {
+ Iterator<Long> iterator = dirtyBuckets.keySet().iterator();
+ for (; iterator.hasNext(); ) {
+ long key = iterator.next();
+ Bucket t = dirtyBuckets.get(key);
+ if (startOfBucketsInMillis + (t.bucketKey * bucketSpanInMillis) < time) {
+ deleteBucket(t);
+ iterator.remove();
+ }
+ }
+ }
+
+ /**
+ * Return the unmatched events which are present in the expired buckets
+ *
+ * @return the list of unmatched events
+ */
+ public List<T> getUnmatchedEvents()
+ {
+ List<T> copyEvents = new ArrayList<T>(unmatchedEvents);
+ unmatchedEvents.clear();
+ return copyEvents;
+ }
+
+ /**
+ * Delete the given bucket
+ *
+ * @param bucket
+ */
+ private void deleteBucket(Bucket bucket)
+ {
+ if (bucket == null) {
+ return;
+ }
+ Map<Object, List<T>> writtens = bucket.getEvents();
+ if (writtens == null) {
+ return;
+ }
+
+ for (Map.Entry<Object, List<T>> e : writtens.entrySet()) {
+ // Check the events which are unmatched and add those into the unmatchedEvents list
+ if (isOuter) {
+ for (T event : e.getValue()) {
+ if (!event.isMatch()) {
+ unmatchedEvents.add(event);
+ }
+ }
+ }
+ key2Buckets.get(e.getKey()).remove(bucket.bucketKey);
+ if (key2Buckets.get(e.getKey()).size() == 0) {
+ key2Buckets.remove(e.getKey());
+ }
+ }
+ }
+
+ /**
+ * Create the bucket with the given key
+ *
+ * @param bucketKey
+ * @return the bucket for the given key
+ */
+ protected Bucket<T> createBucket(long bucketKey)
+ {
+ return new Bucket<T>(bucketKey);
+ }
+
+ public void shutdown()
+ {
+ bucketSlidingTimer.cancel();
+ }
+
+ public void isOuterJoin(boolean isOuter)
+ {
+ this.isOuter = isOuter;
+ }
+
+ public long getSpanTimeInMillis()
+ {
+ return spanTimeInMillis;
+ }
+
+ public void setSpanTimeInMillis(long spanTimeInMillis)
+ {
+ this.spanTimeInMillis = spanTimeInMillis;
+ }
+
+ public int getBucketSpanInMillis()
+ {
+ return bucketSpanInMillis;
+ }
+
+ public void setBucketSpanInMillis(int bucketSpanInMillis)
+ {
+ this.bucketSpanInMillis = bucketSpanInMillis;
+ }
+
+ private static class Lock
+ {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/fc547d87/library/src/main/java/com/datatorrent/lib/join/TimeEvent.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/join/TimeEvent.java b/library/src/main/java/com/datatorrent/lib/join/TimeEvent.java
new file mode 100644
index 0000000..d1259d2
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/join/TimeEvent.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.join;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+@InterfaceStability.Unstable
+public interface TimeEvent
+{
+ /**
+ * Returns the time of the event
+ *
+ * @return the time of event
+ */
+ long getTime();
+
+ /**
+ * Returns the key of the event
+ *
+ * @return the key
+ */
+ Object getEventKey();
+
+ Object getValue();
+
+ /**
+ * Returns whether the event has matched tuples or not
+ *
+ * @return whether the event has matched tuples or not
+ */
+ boolean isMatch();
+
+ void setMatch(boolean match);
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/fc547d87/library/src/main/java/com/datatorrent/lib/join/TimeEventImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/join/TimeEventImpl.java b/library/src/main/java/com/datatorrent/lib/join/TimeEventImpl.java
new file mode 100644
index 0000000..bafab29
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/join/TimeEventImpl.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.join;
+
+import javax.annotation.Nonnull;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Time event Implementation.
+ */
+@InterfaceStability.Unstable
+public class TimeEventImpl implements TimeEvent, Comparable<TimeEventImpl>
+{
+ protected Object key;
+ protected long time;
+ protected Object tuple;
+ protected boolean match;
+
+ @SuppressWarnings("unused")
+ public TimeEventImpl()
+ {
+ }
+
+ public TimeEventImpl(Object key, long time, Object tuple)
+ {
+ this.key = key;
+ this.time = time;
+ this.tuple = tuple;
+ this.match = false;
+ }
+
+ @Override
+ public long getTime()
+ {
+ return time;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof TimeEventImpl)) {
+ return false;
+ }
+
+ TimeEventImpl that = (TimeEventImpl)o;
+
+ return time == that.time && !(key != null ? !key.equals(that.key) : that.key != null) &&
+ !(tuple != null ? !tuple.equals(that.tuple) : that.tuple != null);
+
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int result = key != null ? key.hashCode() : 0;
+ result = 31 * result + (int)(time ^ (time >>> 32));
+ return result;
+ }
+
+ @Override
+ public Object getEventKey()
+ {
+ return key;
+ }
+
+ @Override
+ public int compareTo(@Nonnull TimeEventImpl dummyEvent)
+ {
+ if (key.equals(dummyEvent.key)) {
+ return 0;
+ }
+ return -1;
+ }
+
+ public Object getValue()
+ {
+ return tuple;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "TimeEvent{" +
+ "key=" + key +
+ ", time=" + time +
+ ", tuple=" + tuple +
+ ", match=" + match +
+ '}';
+ }
+
+ public boolean isMatch()
+ {
+ return match;
+ }
+
+ public void setMatch(boolean match)
+ {
+ this.match = match;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/fc547d87/library/src/test/java/com/datatorrent/lib/join/MapTimeBasedJoinOperator.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/join/MapTimeBasedJoinOperator.java b/library/src/test/java/com/datatorrent/lib/join/MapTimeBasedJoinOperator.java
new file mode 100644
index 0000000..391b37d
--- /dev/null
+++ b/library/src/test/java/com/datatorrent/lib/join/MapTimeBasedJoinOperator.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.join;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+
+import com.google.common.collect.Maps;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Context;
+import com.datatorrent.lib.helper.OperatorContextTestHelper;
+import com.datatorrent.lib.testbench.CollectorTestSink;
+import com.datatorrent.lib.util.TestUtils;
+
+public class MapTimeBasedJoinOperator
+{
+ @Rule
+ public final TestUtils.TestInfo testInfo = new TestUtils.TestInfo();
+ private static Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap();
+ public static final Context.OperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(1, attributes);
+
+ @Test
+ public void testJoinOperator() throws IOException, InterruptedException
+ {
+
+ AbstractJoinOperator oper = new MapJoinOperator();
+ oper.setLeftStore(new InMemoryStore(200, 200));
+ oper.setRightStore(new InMemoryStore(200, 200));
+ oper.setIncludeFields("ID,Name;OID,Amount");
+ oper.setKeyFields("ID,CID");
+
+ oper.setup(context);
+
+ CollectorTestSink<List<Map<String, Object>>> sink = new CollectorTestSink<List<Map<String, Object>>>();
+ @SuppressWarnings({"unchecked", "rawtypes"}) CollectorTestSink<Object> tmp = (CollectorTestSink)sink;
+ oper.outputPort.setSink(tmp);
+
+ oper.beginWindow(0);
+ Map<String, Object> tuple1 = Maps.newHashMap();
+ tuple1.put("ID", 1);
+ tuple1.put("Name", "Anil");
+
+ oper.input1.process(tuple1);
+
+ CountDownLatch latch = new CountDownLatch(1);
+ Map<String, Object> order1 = Maps.newHashMap();
+ order1.put("OID", 102);
+ order1.put("CID", 1);
+ order1.put("Amount", 300);
+
+ oper.input2.process(order1);
+
+ Map<String, Object> order2 = Maps.newHashMap();
+ order2.put("OID", 103);
+ order2.put("CID", 3);
+ order2.put("Amount", 300);
+
+ oper.input2.process(order2);
+ latch.await(200, TimeUnit.MILLISECONDS);
+ oper.endWindow();
+
+ oper.beginWindow(1);
+ Map<String, Object> tuple2 = Maps.newHashMap();
+ tuple2.put("ID", 4);
+ tuple2.put("Name", "DT");
+ oper.input1.process(tuple2);
+
+ Map<String, Object> order3 = Maps.newHashMap();
+ order3.put("OID", 104);
+ order3.put("CID", 1);
+ order3.put("Amount", 300);
+
+ oper.input2.process(order2);
+
+ latch.await(200, TimeUnit.MILLISECONDS);
+
+ oper.endWindow();
+
+ /* Number of tuple, emitted */
+ Assert.assertEquals("Number of tuple emitted ", 1, sink.collectedTuples.size());
+ List<Map<String, Object>> emittedList = sink.collectedTuples.iterator().next();
+ Assert.assertEquals("Size of Joined Tuple ", 1, emittedList.size());
+ Map<String, Object> emitted = emittedList.get(0);
+
+ /* The fields present in original event is kept as it is */
+ Assert.assertEquals("Number of fields in emitted tuple", 4, emitted.size());
+ Assert.assertEquals("value of ID :", tuple1.get("ID"), emitted.get("ID"));
+ Assert.assertEquals("value of Name :", tuple1.get("Name"), emitted.get("Name"));
+
+ Assert.assertEquals("value of OID: ", order1.get("OID"), emitted.get("OID"));
+ Assert.assertEquals("value of Amount: ", order1.get("Amount"), emitted.get("Amount"));
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/fc547d87/library/src/test/java/com/datatorrent/lib/join/POJOTimeBasedJoinOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/join/POJOTimeBasedJoinOperatorTest.java b/library/src/test/java/com/datatorrent/lib/join/POJOTimeBasedJoinOperatorTest.java
new file mode 100644
index 0000000..6914da0
--- /dev/null
+++ b/library/src/test/java/com/datatorrent/lib/join/POJOTimeBasedJoinOperatorTest.java
@@ -0,0 +1,385 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.join;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+
+import com.esotericsoftware.kryo.Kryo;
+
+import com.datatorrent.lib.testbench.CollectorTestSink;
+import com.datatorrent.lib.util.TestUtils;
+
+public class POJOTimeBasedJoinOperatorTest
+{
+
+ @Rule
+ public final TestUtils.TestInfo testInfo = new TestUtils.TestInfo();
+
+ public class Customer
+ {
+ public int ID;
+ public String Name;
+
+ public Customer()
+ {
+
+ }
+
+ public Customer(int ID, String name)
+ {
+ this.ID = ID;
+ Name = name;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "Customer{" +
+ "ID=" + ID +
+ ", Name='" + Name + '\'' +
+ '}';
+ }
+ }
+
+ public class Order
+ {
+ public int OID;
+ public int CID;
+ public int Amount;
+
+ public Order()
+ {
+ }
+
+ public Order(int OID, int CID, int amount)
+ {
+ this.OID = OID;
+ this.CID = CID;
+ Amount = amount;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "Order{" +
+ "OID=" + OID +
+ ", CID=" + CID +
+ ", Amount=" + Amount +
+ '}';
+ }
+ }
+
+ public static class CustOrder
+ {
+ public int ID;
+ public String Name;
+ public int OID;
+ public int Amount;
+
+ public CustOrder()
+ {
+ }
+
+ @Override
+ public String toString()
+ {
+ return "{" +
+ "ID=" + ID +
+ ", Name='" + Name + '\'' +
+ ", OID=" + OID +
+ ", Amount=" + Amount +
+ '}';
+ }
+ }
+
+ @Test
+ public void testInnerJoinOperator() throws IOException, InterruptedException
+ {
+ Kryo kryo = new Kryo();
+ POJOJoinOperator oper = new POJOJoinOperator();
+ JoinStore store = new InMemoryStore(200, 200);
+ oper.setLeftStore(kryo.copy(store));
+ oper.setRightStore(kryo.copy(store));
+ oper.setIncludeFields("ID,Name;OID,Amount");
+ oper.setKeyFields("ID,CID");
+ oper.outputClass = CustOrder.class;
+
+ oper.setup(MapTimeBasedJoinOperator.context);
+
+ CollectorTestSink<List<CustOrder>> sink = new CollectorTestSink<List<CustOrder>>();
+ @SuppressWarnings({"unchecked", "rawtypes"}) CollectorTestSink<Object> tmp = (CollectorTestSink)sink;
+ oper.outputPort.setSink(tmp);
+
+ oper.beginWindow(0);
+
+ Customer tuple = new Customer(1, "Anil");
+
+ oper.input1.process(tuple);
+
+ CountDownLatch latch = new CountDownLatch(1);
+
+ Order order = new Order(102, 1, 300);
+
+ oper.input2.process(order);
+
+ Order order2 = new Order(103, 3, 300);
+ oper.input2.process(order2);
+
+ Order order3 = new Order(104, 7, 300);
+ oper.input2.process(order3);
+
+ latch.await(3000, TimeUnit.MILLISECONDS);
+
+ oper.endWindow();
+
+ /* Number of tuple, emitted */
+ Assert.assertEquals("Number of tuple emitted ", 1, sink.collectedTuples.size());
+ List<CustOrder> emittedList = sink.collectedTuples.iterator().next();
+ CustOrder emitted = emittedList.get(0);
+
+ Assert.assertEquals("value of ID :", tuple.ID, emitted.ID);
+ Assert.assertEquals("value of Name :", tuple.Name, emitted.Name);
+
+ Assert.assertEquals("value of OID: ", order.OID, emitted.OID);
+ Assert.assertEquals("value of Amount: ", order.Amount, emitted.Amount);
+
+ }
+
+ @Test
+ public void testLeftOuterJoinOperator() throws IOException, InterruptedException
+ {
+ Kryo kryo = new Kryo();
+ POJOJoinOperator oper = new POJOJoinOperator();
+ JoinStore store = new InMemoryStore(200, 200);
+ oper.setLeftStore(kryo.copy(store));
+ oper.setRightStore(kryo.copy(store));
+ oper.setIncludeFields("ID,Name;OID,Amount");
+ oper.setKeyFields("ID,CID");
+ oper.outputClass = CustOrder.class;
+ oper.setStrategy("left_outer_join");
+
+ oper.setup(MapTimeBasedJoinOperator.context);
+
+ CollectorTestSink<List<CustOrder>> sink = new CollectorTestSink<List<CustOrder>>();
+ @SuppressWarnings({"unchecked", "rawtypes"}) CollectorTestSink<Object> tmp = (CollectorTestSink)sink;
+ oper.outputPort.setSink(tmp);
+
+ oper.beginWindow(0);
+
+ Customer tuple1 = new Customer(1, "Anil");
+
+ oper.input1.process(tuple1);
+
+ CountDownLatch latch = new CountDownLatch(1);
+
+ Order order = new Order(102, 3, 300);
+
+ oper.input2.process(order);
+
+ Order order2 = new Order(103, 7, 300);
+ oper.input2.process(order2);
+
+ oper.endWindow();
+
+ latch.await(500, TimeUnit.MILLISECONDS);
+
+ oper.beginWindow(1);
+ Order order3 = new Order(104, 5, 300);
+ oper.input2.process(order3);
+
+ Customer tuple2 = new Customer(5, "DT");
+
+ oper.input1.process(tuple2);
+
+ latch.await(500, TimeUnit.MILLISECONDS);
+
+ oper.endWindow();
+ latch.await(500, TimeUnit.MILLISECONDS);
+ oper.beginWindow(2);
+ oper.endWindow();
+ latch.await(5000, TimeUnit.MILLISECONDS);
+
+
+ /* Number of tuple, emitted */
+ Assert.assertEquals("Number of tuple emitted ", 2, sink.collectedTuples.size());
+ Iterator<List<CustOrder>> ite = sink.collectedTuples.iterator();
+ List<CustOrder> emittedList = ite.next();
+ CustOrder emitted = emittedList.get(0);
+
+ Assert.assertEquals("value of ID :", tuple2.ID, emitted.ID);
+ Assert.assertEquals("value of Name :", tuple2.Name, emitted.Name);
+
+ Assert.assertEquals("value of OID: ", order3.OID, emitted.OID);
+ Assert.assertEquals("value of Amount: ", order3.Amount, emitted.Amount);
+
+ emittedList = ite.next();
+ emitted = emittedList.get(0);
+ Assert.assertEquals("Joined Tuple ", "{ID=1, Name='Anil', OID=0, Amount=0}", emitted.toString());
+ }
+
+ @Test
+ public void testRightOuterJoinOperator() throws IOException, InterruptedException
+ {
+ Kryo kryo = new Kryo();
+ POJOJoinOperator oper = new POJOJoinOperator();
+ JoinStore store = new InMemoryStore(200, 200);
+ oper.setLeftStore(kryo.copy(store));
+ oper.setRightStore(kryo.copy(store));
+ oper.setIncludeFields("ID,Name;OID,Amount");
+ oper.setKeyFields("ID,CID");
+ oper.outputClass = CustOrder.class;
+ oper.setStrategy("right_outer_join");
+
+ oper.setup(MapTimeBasedJoinOperator.context);
+
+ CollectorTestSink<List<CustOrder>> sink = new CollectorTestSink<List<CustOrder>>();
+ @SuppressWarnings({"unchecked", "rawtypes"}) CollectorTestSink<Object> tmp = (CollectorTestSink)sink;
+ oper.outputPort.setSink(tmp);
+
+ oper.beginWindow(0);
+
+ Customer tuple1 = new Customer(1, "Anil");
+
+ oper.input1.process(tuple1);
+
+ CountDownLatch latch = new CountDownLatch(1);
+
+ Order order = new Order(102, 3, 300);
+
+ oper.input2.process(order);
+
+ Order order2 = new Order(103, 7, 300);
+ oper.input2.process(order2);
+ oper.endWindow();
+
+ latch.await(500, TimeUnit.MILLISECONDS);
+
+ oper.beginWindow(1);
+ Order order3 = new Order(104, 5, 300);
+ oper.input2.process(order3);
+
+ Customer tuple2 = new Customer(5, "DT");
+ oper.input1.process(tuple2);
+
+ latch.await(500, TimeUnit.MILLISECONDS);
+
+ oper.endWindow();
+ latch.await(500, TimeUnit.MILLISECONDS);
+ oper.beginWindow(2);
+ oper.endWindow();
+ latch.await(5000, TimeUnit.MILLISECONDS);
+
+ /* Number of tuple, emitted */
+ Assert.assertEquals("Number of tuple emitted ", 2, sink.collectedTuples.size());
+ Iterator<List<CustOrder>> ite = sink.collectedTuples.iterator();
+ List<CustOrder> emittedList = ite.next();
+ CustOrder emitted = emittedList.get(0);
+
+ Assert.assertEquals("value of ID :", tuple2.ID, emitted.ID);
+ Assert.assertEquals("value of Name :", tuple2.Name, emitted.Name);
+
+ Assert.assertEquals("value of OID: ", order3.OID, emitted.OID);
+ Assert.assertEquals("value of Amount: ", order3.Amount, emitted.Amount);
+
+ emittedList = ite.next();
+ Assert.assertEquals("Joined Tuple ", "{ID=0, Name='null', OID=102, Amount=300}", emittedList.get(0).toString());
+ Assert.assertEquals("Joined Tuple ", "{ID=0, Name='null', OID=103, Amount=300}", emittedList.get(1).toString());
+ }
+
+ @Test
+ public void testFullOuterJoinOperator() throws IOException, InterruptedException
+ {
+ Kryo kryo = new Kryo();
+ POJOJoinOperator oper = new POJOJoinOperator();
+ JoinStore store = new InMemoryStore(200, 200);
+ oper.setLeftStore(kryo.copy(store));
+ oper.setRightStore(kryo.copy(store));
+ oper.setIncludeFields("ID,Name;OID,Amount");
+ oper.setKeyFields("ID,CID");
+ oper.outputClass = CustOrder.class;
+ oper.setStrategy("outer_join");
+
+ oper.setup(MapTimeBasedJoinOperator.context);
+
+ CollectorTestSink<List<CustOrder>> sink = new CollectorTestSink<List<CustOrder>>();
+ @SuppressWarnings({"unchecked", "rawtypes"}) CollectorTestSink<Object> tmp = (CollectorTestSink)sink;
+ oper.outputPort.setSink(tmp);
+
+ oper.beginWindow(0);
+
+ Customer tuple1 = new Customer(1, "Anil");
+
+ oper.input1.process(tuple1);
+
+ CountDownLatch latch = new CountDownLatch(1);
+
+ Order order = new Order(102, 3, 300);
+
+ oper.input2.process(order);
+
+ Order order2 = new Order(103, 7, 300);
+ oper.input2.process(order2);
+ oper.endWindow();
+
+ latch.await(500, TimeUnit.MILLISECONDS);
+
+ oper.beginWindow(1);
+ Order order3 = new Order(104, 5, 300);
+ oper.input2.process(order3);
+
+ Customer tuple2 = new Customer(5, "DT");
+ oper.input1.process(tuple2);
+
+ latch.await(500, TimeUnit.MILLISECONDS);
+
+ oper.endWindow();
+ latch.await(500, TimeUnit.MILLISECONDS);
+ oper.beginWindow(2);
+ oper.endWindow();
+ latch.await(5000, TimeUnit.MILLISECONDS);
+
+ /* Number of tuple, emitted */
+ Assert.assertEquals("Number of tuple emitted ", 3, sink.collectedTuples.size());
+ Iterator<List<CustOrder>> ite = sink.collectedTuples.iterator();
+ List<CustOrder> emittedList = ite.next();
+ CustOrder emitted = emittedList.get(0);
+
+ Assert.assertEquals("value of ID :", tuple2.ID, emitted.ID);
+ Assert.assertEquals("value of Name :", tuple2.Name, emitted.Name);
+
+ Assert.assertEquals("value of OID: ", order3.OID, emitted.OID);
+ Assert.assertEquals("value of Amount: ", order3.Amount, emitted.Amount);
+
+ emittedList = ite.next();
+ Assert.assertEquals("Joined Tuple ", "{ID=1, Name='Anil', OID=0, Amount=0}", emittedList.get(0).toString());
+
+ emittedList = ite.next();
+ Assert.assertEquals("Joined Tuple ", "{ID=0, Name='null', OID=102, Amount=300}", emittedList.get(0).toString());
+ Assert.assertEquals("Joined Tuple ", "{ID=0, Name='null', OID=103, Amount=300}", emittedList.get(1).toString());
+ }
+
+}
[2/2] incubator-apex-malhar git commit: Merge branch
'SPOI-4520-JoinOp' into devel-3
Posted by cs...@apache.org.
Merge branch 'SPOI-4520-JoinOp' into devel-3
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/fd2f42bd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/fd2f42bd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/fd2f42bd
Branch: refs/heads/devel-3
Commit: fd2f42bd926d46f07d5717b2fcb614c6cf2abff5
Parents: 8331f56 fc547d8
Author: Chandni Singh <cs...@apache.org>
Authored: Fri Feb 26 15:17:51 2016 -0800
Committer: Chandni Singh <cs...@apache.org>
Committed: Fri Feb 26 15:17:51 2016 -0800
----------------------------------------------------------------------
.../lib/join/AbstractJoinOperator.java | 435 +++++++++++++++++++
.../java/com/datatorrent/lib/join/Bucket.java | 91 ++++
.../com/datatorrent/lib/join/InMemoryStore.java | 107 +++++
.../com/datatorrent/lib/join/JoinStore.java | 91 ++++
.../datatorrent/lib/join/MapJoinOperator.java | 100 +++++
.../datatorrent/lib/join/POJOJoinOperator.java | 266 ++++++++++++
.../datatorrent/lib/join/TimeBasedStore.java | 333 ++++++++++++++
.../com/datatorrent/lib/join/TimeEvent.java | 50 +++
.../com/datatorrent/lib/join/TimeEventImpl.java | 121 ++++++
.../lib/join/MapTimeBasedJoinOperator.java | 118 +++++
.../lib/join/POJOTimeBasedJoinOperatorTest.java | 385 ++++++++++++++++
11 files changed, 2097 insertions(+)
----------------------------------------------------------------------