You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@apex.apache.org by chaithu14 <gi...@git.apache.org> on 2015/11/25 13:13:49 UTC

[GitHub] incubator-apex-malhar pull request: SPOI-4520 Implemented Inmemory...

GitHub user chaithu14 opened a pull request:

    https://github.com/apache/incubator-apex-malhar/pull/113

    SPOI-4520 Implemented Inmemory Join Operator. Supported Inner, LeftOuter, RightOuter and FullOuter join types

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/chaithu14/incubator-apex-malhar SPOI-4520-JoinOp

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-apex-malhar/pull/113.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #113
    
----
commit 2822853215f85a1d0382d58b72ada5458dcb0e5a
Author: Chaitanya <ch...@datatorrent.com>
Date:   2015-11-25T12:09:22Z

    SPOI-4520 Implemented Inmemory Join Operator. Supported Inner, LeftOuter, RightOuter and FullOuter join types

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: SPOI-4520 Implemented Inmemory...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/113#discussion_r45944418
  
    --- Diff: library/src/main/java/com/datatorrent/lib/join/AbstractJoinOperator.java ---
    @@ -0,0 +1,343 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.join;
    +
    +import java.lang.reflect.Array;
    +import java.util.ArrayList;
    +import java.util.Calendar;
    +import java.util.List;
    +
    +import org.apache.hadoop.classification.InterfaceStability.Evolving;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DefaultInputPort;
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.api.Operator;
    +import com.datatorrent.api.annotation.InputPortFieldAnnotation;
    +import com.datatorrent.common.util.BaseOperator;
    +
    +/**
    + * <p>
    + * This is the base implementation of join operator. Operator receives tuples from multiple streams,
    + * applies the join operation based on constraint and emit the joined value.
    + * Subclasses should provide implementation to createOutputTuple,copyValue, getKeyValue, getTime methods.
    + *
    + * <b>Properties:</b><br>
    + * <b>expiryTime</b>: Expiry time for stored tuples<br>
    + * <b>includeFieldStr</b>: List of comma separated fields to be added to the output tuple.
    + *                         Ex: Field1,Field2;Field3,Field4<br>
    + * <b>keyFields</b>: List of comma separated key field for both the streams. Ex: Field1,Field2<br>
    + * <b>timeFields</b>: List of comma separated time field for both the streams. Ex: Field1,Field2<br>
    + * <b>bucketSpanInMillis</b>: Span of each bucket in milliseconds.<br>
    + * <b>strategy</b>: Type of join operation. Default type is inner join<br>
    + * <br>
    + * </p>
    + *
    + * @displayName Abstract Join Operator
    + * @tags join
    + */
    +@Evolving
    +public abstract class AbstractJoinOperator<T> extends BaseOperator implements Operator.CheckpointListener
    +{
    +  public final transient DefaultOutputPort<List<T>> outputPort = new DefaultOutputPort<>();
    +  protected String[] timeFields;
    +
    +  protected String[][] includeFields;
    +  // Fields to compare from both the streams
    +  protected String[] keys;
    +  // Strategy of Join operation, by default the option is inner join
    +  protected JoinStrategy strategy = JoinStrategy.INNER_JOIN;
    +  // This represents whether the processing tuple is from left port or not
    +  protected Boolean isLeft;
    +  @InputPortFieldAnnotation(optional = true)
    +  public transient DefaultInputPort<T> input1 = new DefaultInputPort<T>()
    +  {
    +    @Override
    +    public void process(T tuple)
    +    {
    +      isLeft = true;
    +      processTuple(tuple);
    +    }
    +  };
    +  @InputPortFieldAnnotation(optional = true)
    +  public transient DefaultInputPort<T> input2 = new DefaultInputPort<T>()
    +  {
    +    @Override
    +    public void process(T tuple)
    +    {
    +      isLeft = false;
    +      processTuple(tuple);
    +    }
    +  };
    +  // Stores for each of the input port
    +  private BackupStore[] store = (BackupStore[])Array.newInstance(BackupStore.class, 2);
    +  private String includeFieldStr;
    +  private String keyFieldStr;
    +  private String timeFieldStr;
    +
    +  @Override
    +  public void setup(Context.OperatorContext context)
    +  {
    +    if (store[0] == null) {
    +      throw new RuntimeException("Left Store is Empty");
    +    }
    +    if (store[1] == null) {
    +      throw new RuntimeException("Right Store is Empty");
    +    }
    +    // Checks whether the strategy is outer join and set it to store
    +    boolean isOuter = strategy.equals(JoinStrategy.LEFT_OUTER_JOIN) || strategy.equals(JoinStrategy.OUTER_JOIN);
    +    store[0].isOuterJoin(isOuter);
    +    isOuter = strategy.equals(JoinStrategy.RIGHT_OUTER_JOIN) || strategy.equals(JoinStrategy.OUTER_JOIN);
    +    store[1].isOuterJoin(isOuter);
    +    // Setup the stores
    +    store[0].setup();
    +    store[1].setup();
    +
    +    populateFields();
    +  }
    +
    +  /**
    +   * Create the event with the given tuple. If it successfully inserted it into the store
    +   * then it does the join operation
    +   *
    +   * @param tuple Tuple to process
    +   */
    +  protected void processTuple(T tuple)
    +  {
    +    int idx = 0;
    +    if (!isLeft) {
    +      idx = 1;
    +    }
    +    TimeEvent t = createEvent(tuple);
    +    if (store[idx].put(t)) {
    +      join(t, isLeft);
    +    }
    +  }
    +
    +  private void populateFields()
    +  {
    +    populateIncludeFields();
    +    populateKeyFields();
    +    if (timeFieldStr != null) {
    +      populateTimeFields();
    +    }
    +  }
    +
    +  /**
    +   * Populate the fields from the includeFiledStr
    +   */
    +  private void populateIncludeFields()
    +  {
    +    includeFields = new String[2][];
    +    String[] portFields = includeFieldStr.split(";");
    +    for (int i = 0; i < portFields.length; i++) {
    +      includeFields[i] = portFields[i].split(",");
    +    }
    +  }
    +
    +  /**
    +   * Get the tuples from another store based on join constraint and key
    +   *
    +   * @param tuple  input
    +   * @param isLeft whether the given tuple is from first port or not
    +   */
    +  private void join(TimeEvent tuple, Boolean isLeft)
    +  {
    +    // Get the valid tuples from the store based on key
    +    // If the tuple is null means the join type is outer and return unmatched tuples from store.
    +    Object value;
    +    if (isLeft) {
    +      if (tuple != null) {
    +        value = store[1].getValidTuples(tuple);
    +      } else {
    +        value = store[1].getUnMatchedTuples();
    +      }
    +    } else {
    +      if (tuple != null) {
    +        value = store[0].getValidTuples(tuple);
    +      } else {
    +        value = store[0].getUnMatchedTuples();
    +      }
    +    }
    +    // Join the input tuple with the joined tuples
    +    if (value != null) {
    +      ArrayList<TimeEvent> joinedValues = (ArrayList<TimeEvent>)value;
    +      List<T> result = new ArrayList<>();
    +      for (TimeEvent joinedValue : joinedValues) {
    +        T output = createOutputTuple();
    +        Object tupleValue = null;
    +        if (tuple != null) {
    +          tupleValue = tuple.getValue();
    +        }
    +        copyValue(output, tupleValue, isLeft);
    +        copyValue(output, joinedValue.getValue(), !isLeft);
    +        result.add(output);
    +        joinedValue.setMatch(true);
    +      }
    +      if (tuple != null) {
    +        tuple.setMatch(true);
    +      }
    +      if (result.size() != 0) {
    +        outputPort.emit(result);
    +      }
    +    }
    +  }
    +
    +  // Emit the unmatched tuples, if the strategy is outer join
    +  @Override
    --- End diff --
    
    Is the join operator defined within window boundaries?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: SPOI-4520 Implemented Inmemory...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/113#discussion_r45944127
  
    --- Diff: library/src/main/java/com/datatorrent/lib/join/TimeBasedStore.java ---
    @@ -0,0 +1,333 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.join;
    +
    +import java.lang.reflect.Array;
    +import java.util.ArrayList;
    +import java.util.Calendar;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.Timer;
    +import java.util.TimerTask;
    +import java.util.concurrent.ConcurrentHashMap;
    +
    +import javax.validation.constraints.Min;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import org.apache.hadoop.classification.InterfaceStability;
    +
    +/**
    + * Base implementation of time based store for key-value pair tuples.
    + *
    + * @param <T>
    + */
    +@InterfaceStability.Evolving
    +public class TimeBasedStore<T extends TimeEvent>
    +{
    --- End diff --
    
    Should TimeBasedStore implement BackupStore as it is store?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1720 Implemented Inmemory...

Posted by chaithu14 <gi...@git.apache.org>.
Github user chaithu14 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/113#discussion_r46123624
  
    --- Diff: library/src/main/java/com/datatorrent/lib/join/TimeBasedStore.java ---
    @@ -0,0 +1,333 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.join;
    +
    +import java.lang.reflect.Array;
    +import java.util.ArrayList;
    +import java.util.Calendar;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.Timer;
    +import java.util.TimerTask;
    +import java.util.concurrent.ConcurrentHashMap;
    +
    +import javax.validation.constraints.Min;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import org.apache.hadoop.classification.InterfaceStability;
    +
    +/**
    + * Base implementation of time based store for key-value pair tuples.
    + *
    + * @param <T>
    + */
    +@InterfaceStability.Evolving
    +public class TimeBasedStore<T extends TimeEvent>
    +{
    --- End diff --
    
    No


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1720 Implemented Inmemory...

Posted by chaithu14 <gi...@git.apache.org>.
Github user chaithu14 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/113#discussion_r46114102
  
    --- Diff: library/src/main/java/com/datatorrent/lib/join/AbstractJoinOperator.java ---
    @@ -0,0 +1,341 @@
    +/*
    + * Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved.
    + *
    + * Licensed under the Apache License, Version 2.0 (the "License");
    + * you may not use this file except in compliance with the License.
    + * You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package com.datatorrent.lib.join;
    +
    +import java.lang.reflect.Array;
    +import java.util.ArrayList;
    +import java.util.Calendar;
    +import java.util.List;
    +
    +import org.apache.hadoop.classification.InterfaceStability.Evolving;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DefaultInputPort;
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.api.Operator;
    +import com.datatorrent.api.annotation.InputPortFieldAnnotation;
    +import com.datatorrent.common.util.BaseOperator;
    +
    +/**
    + * <p>
    + * This is the base implementation of join operator. Operator receives tuples from multiple streams,
    + * applies the join operation based on constraint and emit the joined value.
    + * Subclasses should provide implementation to createOutputTuple,copyValue, getKeyValue, getTime methods.
    + *
    + * <b>Properties:</b><br>
    + * <b>expiryTime</b>: Expiry time for stored tuples<br>
    + * <b>includeFieldStr</b>: List of comma separated fields to be added to the output tuple.
    + *                         Ex: Field1,Field2;Field3,Field4<br>
    + * <b>keyFields</b>: List of comma separated key field for both the streams. Ex: Field1,Field2<br>
    + * <b>timeFields</b>: List of comma separated time field for both the streams. Ex: Field1,Field2<br>
    + * <b>bucketSpanInMillis</b>: Span of each bucket in milliseconds.<br>
    + * <b>strategy</b>: Type of join operation. Default type is inner join<br>
    + * <br>
    + * </p>
    + *
    + * @displayName Abstract Join Operator
    + * @tags join
    + * @since 3.3.0
    + */
    +@Evolving
    +public abstract class AbstractJoinOperator<T> extends BaseOperator implements Operator.CheckpointListener
    +{
    +  public final transient DefaultOutputPort<List<T>> outputPort = new DefaultOutputPort<>();
    +  protected String[] timeFields;
    +
    +  protected String[][] includeFields;
    +  // Fields to compare from both the streams
    +  protected String[] keys;
    +  // Strategy of Join operation, by default the option is inner join
    +  protected JoinStrategy strategy = JoinStrategy.INNER_JOIN;
    +  // This represents whether the processing tuple is from left port or not
    +  protected Boolean isLeft;
    +  @InputPortFieldAnnotation(optional = true)
    +  public transient DefaultInputPort<T> input1 = new DefaultInputPort<T>()
    +  {
    +    @Override
    +    public void process(T tuple)
    +    {
    +      isLeft = true;
    +      processTuple(tuple);
    +    }
    +  };
    +  @InputPortFieldAnnotation(optional = true)
    +  public transient DefaultInputPort<T> input2 = new DefaultInputPort<T>()
    +  {
    +    @Override
    +    public void process(T tuple)
    +    {
    +      isLeft = false;
    +      processTuple(tuple);
    +    }
    +  };
    +  // Stores for each of the input port
    +  private BackupStore[] store = (BackupStore[])Array.newInstance(BackupStore.class, 2);
    +  private String includeFieldStr;
    +  private String keyFieldStr;
    +  private String timeFieldStr;
    +
    +  @Override
    +  public void setup(Context.OperatorContext context)
    +  {
    +    if (store[0] == null) {
    +      throw new RuntimeException("Left Store is Empty");
    +    }
    +    if (store[1] == null) {
    +      throw new RuntimeException("Right Store is Empty");
    +    }
    +    // Checks whether the strategy is outer join and set it to store
    +    boolean isOuter = strategy.equals(JoinStrategy.LEFT_OUTER_JOIN) || strategy.equals(JoinStrategy.OUTER_JOIN);
    +    store[0].isOuterJoin(isOuter);
    +    isOuter = strategy.equals(JoinStrategy.RIGHT_OUTER_JOIN) || strategy.equals(JoinStrategy.OUTER_JOIN);
    +    store[1].isOuterJoin(isOuter);
    +    // Setup the stores
    +    store[0].setup();
    +    store[1].setup();
    +
    +    populateFields();
    +  }
    +
    +  /**
    +   * Create the event with the given tuple. If it successfully inserted it into the store
    +   * then it does the join operation
    +   *
    +   * @param tuple Tuple to process
    +   */
    +  protected void processTuple(T tuple)
    +  {
    +    int idx = 0;
    +    if (!isLeft) {
    +      idx = 1;
    +    }
    +    TimeEvent t = createEvent(tuple);
    +    if (store[idx].put(t)) {
    +      join(t, isLeft);
    +    }
    +  }
    +
    +  private void populateFields()
    +  {
    +    populateIncludeFields();
    +    populateKeyFields();
    +    if (timeFieldStr != null) {
    +      populateTimeFields();
    +    }
    +  }
    +
    +  /**
    +   * Populate the fields from the includeFiledStr
    +   */
    +  private void populateIncludeFields()
    +  {
    +    includeFields = new String[2][];
    +    String[] portFields = includeFieldStr.split(";");
    +    for (int i = 0; i < portFields.length; i++) {
    +      includeFields[i] = portFields[i].split(",");
    +    }
    +  }
    +
    +  /**
    +   * Get the tuples from another store based on join constraint and key
    +   *
    +   * @param tuple  input
    +   * @param isLeft whether the given tuple is from first port or not
    +   */
    +  private void join(TimeEvent tuple, Boolean isLeft)
    +  {
    +    // Get the valid tuples from the store based on key
    +    // If the tuple is null means the join type is outer and return unmatched tuples from store.
    +    Object value;
    +    if (isLeft) {
    +      if (tuple != null) {
    +        value = store[1].getValidTuples(tuple);
    +      } else {
    +        value = store[1].getUnMatchedTuples();
    +      }
    +    } else {
    +      if (tuple != null) {
    +        value = store[0].getValidTuples(tuple);
    +      } else {
    +        value = store[0].getUnMatchedTuples();
    +      }
    +    }
    +    // Join the input tuple with the joined tuples
    +    if (value != null) {
    +      ArrayList<TimeEvent> joinedValues = (ArrayList<TimeEvent>)value;
    +      List<T> result = new ArrayList<>();
    +      for (TimeEvent joinedValue : joinedValues) {
    +        T output = createOutputTuple();
    +        Object tupleValue = null;
    +        if (tuple != null) {
    +          tupleValue = tuple.getValue();
    +        }
    +        copyValue(output, tupleValue, isLeft);
    +        copyValue(output, joinedValue.getValue(), !isLeft);
    +        result.add(output);
    +        joinedValue.setMatch(true);
    +      }
    +      if (tuple != null) {
    +        tuple.setMatch(true);
    +      }
    +      if (result.size() != 0) {
    +        outputPort.emit(result);
    +      }
    +    }
    +  }
    +
    +  // Emit the unmatched tuples, if the strategy is outer join
    +  @Override
    +  public void endWindow()
    +  {
    +    if (strategy.equals(JoinStrategy.LEFT_OUTER_JOIN) || strategy.equals(JoinStrategy.OUTER_JOIN)) {
    +      join(null, false);
    +    }
    +    if (strategy.equals(JoinStrategy.RIGHT_OUTER_JOIN) || strategy.equals(JoinStrategy.OUTER_JOIN)) {
    +      join(null, true);
    +    }
    +    store[0].endWindow();
    +    store[1].endWindow();
    +  }
    +
    +  @Override
    +  public void checkpointed(long windowId)
    +  {
    +    store[0].checkpointed(windowId);
    +    store[1].checkpointed(windowId);
    +  }
    +
    +  @Override
    +  public void committed(long windowId)
    +  {
    +    store[0].committed(windowId);
    +    store[1].committed(windowId);
    +  }
    +
    +  /**
    +   * Convert the given tuple to event
    +   *
    +   * @param tuple Given tuple to convert into event
    +   * @return event
    +   */
    +  protected TimeEvent createEvent(Object tuple)
    +  {
    +    int idx = 0;
    +    if (!isLeft) {
    +      idx = 1;
    +    }
    +    if (timeFields != null) {
    +      return new TimeEventImpl(getKeyValue(keys[idx], tuple), (Long)getTime(timeFields[idx], tuple), tuple);
    +    } else {
    +      return new TimeEventImpl(getKeyValue(keys[idx], tuple), Calendar.getInstance().getTimeInMillis(), tuple);
    --- End diff --
    
    Size of keys, timeFields arrays is 2 only.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: SPOI-4520 Implemented Inmemory...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/113#discussion_r46111312
  
    --- Diff: library/src/main/java/com/datatorrent/lib/join/AbstractJoinOperator.java ---
    @@ -0,0 +1,341 @@
    +/*
    + * Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved.
    + *
    + * Licensed under the Apache License, Version 2.0 (the "License");
    + * you may not use this file except in compliance with the License.
    + * You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package com.datatorrent.lib.join;
    +
    +import java.lang.reflect.Array;
    +import java.util.ArrayList;
    +import java.util.Calendar;
    +import java.util.List;
    +
    +import org.apache.hadoop.classification.InterfaceStability.Evolving;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DefaultInputPort;
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.api.Operator;
    +import com.datatorrent.api.annotation.InputPortFieldAnnotation;
    +import com.datatorrent.common.util.BaseOperator;
    +
    +/**
    + * <p>
    + * This is the base implementation of join operator. Operator receives tuples from multiple streams,
    + * applies the join operation based on constraint and emit the joined value.
    + * Subclasses should provide implementation to createOutputTuple,copyValue, getKeyValue, getTime methods.
    + *
    + * <b>Properties:</b><br>
    + * <b>expiryTime</b>: Expiry time for stored tuples<br>
    + * <b>includeFieldStr</b>: List of comma separated fields to be added to the output tuple.
    + *                         Ex: Field1,Field2;Field3,Field4<br>
    + * <b>keyFields</b>: List of comma separated key field for both the streams. Ex: Field1,Field2<br>
    + * <b>timeFields</b>: List of comma separated time field for both the streams. Ex: Field1,Field2<br>
    --- End diff --
    
    Please add this to java docs :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1720 Implemented Inmemory...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/113#discussion_r46114285
  
    --- Diff: library/src/main/java/com/datatorrent/lib/join/AbstractJoinOperator.java ---
    @@ -0,0 +1,341 @@
    +/*
    + * Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved.
    + *
    + * Licensed under the Apache License, Version 2.0 (the "License");
    + * you may not use this file except in compliance with the License.
    + * You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package com.datatorrent.lib.join;
    +
    +import java.lang.reflect.Array;
    +import java.util.ArrayList;
    +import java.util.Calendar;
    +import java.util.List;
    +
    +import org.apache.hadoop.classification.InterfaceStability.Evolving;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DefaultInputPort;
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.api.Operator;
    +import com.datatorrent.api.annotation.InputPortFieldAnnotation;
    +import com.datatorrent.common.util.BaseOperator;
    +
    +/**
    + * <p>
    + * This is the base implementation of join operator. Operator receives tuples from multiple streams,
    + * applies the join operation based on constraint and emit the joined value.
    + * Subclasses should provide implementation to createOutputTuple,copyValue, getKeyValue, getTime methods.
    + *
    + * <b>Properties:</b><br>
    + * <b>expiryTime</b>: Expiry time for stored tuples<br>
    + * <b>includeFieldStr</b>: List of comma separated fields to be added to the output tuple.
    + *                         Ex: Field1,Field2;Field3,Field4<br>
    + * <b>keyFields</b>: List of comma separated key field for both the streams. Ex: Field1,Field2<br>
    + * <b>timeFields</b>: List of comma separated time field for both the streams. Ex: Field1,Field2<br>
    + * <b>bucketSpanInMillis</b>: Span of each bucket in milliseconds.<br>
    + * <b>strategy</b>: Type of join operation. Default type is inner join<br>
    + * <br>
    + * </p>
    + *
    + * @displayName Abstract Join Operator
    + * @tags join
    + * @since 3.3.0
    + */
    +@Evolving
    +public abstract class AbstractJoinOperator<T> extends BaseOperator implements Operator.CheckpointListener
    +{
    +  public final transient DefaultOutputPort<List<T>> outputPort = new DefaultOutputPort<>();
    +  protected String[] timeFields;
    +
    +  protected String[][] includeFields;
    +  // Fields to compare from both the streams
    +  protected String[] keys;
    +  // Strategy of Join operation, by default the option is inner join
    +  protected JoinStrategy strategy = JoinStrategy.INNER_JOIN;
    +  // This represents whether the processing tuple is from left port or not
    +  protected Boolean isLeft;
    +  @InputPortFieldAnnotation(optional = true)
    +  public transient DefaultInputPort<T> input1 = new DefaultInputPort<T>()
    +  {
    +    @Override
    +    public void process(T tuple)
    +    {
    +      isLeft = true;
    +      processTuple(tuple);
    +    }
    +  };
    +  @InputPortFieldAnnotation(optional = true)
    +  public transient DefaultInputPort<T> input2 = new DefaultInputPort<T>()
    +  {
    +    @Override
    +    public void process(T tuple)
    +    {
    +      isLeft = false;
    +      processTuple(tuple);
    +    }
    +  };
    +  // Stores for each of the input port
    +  private BackupStore[] store = (BackupStore[])Array.newInstance(BackupStore.class, 2);
    +  private String includeFieldStr;
    +  private String keyFieldStr;
    +  private String timeFieldStr;
    +
    +  @Override
    +  public void setup(Context.OperatorContext context)
    +  {
    +    if (store[0] == null) {
    +      throw new RuntimeException("Left Store is Empty");
    +    }
    +    if (store[1] == null) {
    +      throw new RuntimeException("Right Store is Empty");
    +    }
    +    // Checks whether the strategy is outer join and set it to store
    +    boolean isOuter = strategy.equals(JoinStrategy.LEFT_OUTER_JOIN) || strategy.equals(JoinStrategy.OUTER_JOIN);
    +    store[0].isOuterJoin(isOuter);
    +    isOuter = strategy.equals(JoinStrategy.RIGHT_OUTER_JOIN) || strategy.equals(JoinStrategy.OUTER_JOIN);
    +    store[1].isOuterJoin(isOuter);
    +    // Setup the stores
    +    store[0].setup();
    +    store[1].setup();
    +
    +    populateFields();
    +  }
    +
    +  /**
    +   * Create the event with the given tuple. If it successfully inserted it into the store
    +   * then it does the join operation
    +   *
    +   * @param tuple Tuple to process
    +   */
    +  protected void processTuple(T tuple)
    +  {
    +    int idx = 0;
    +    if (!isLeft) {
    +      idx = 1;
    +    }
    +    TimeEvent t = createEvent(tuple);
    +    if (store[idx].put(t)) {
    +      join(t, isLeft);
    +    }
    +  }
    +
    +  private void populateFields()
    +  {
    +    populateIncludeFields();
    +    populateKeyFields();
    +    if (timeFieldStr != null) {
    +      populateTimeFields();
    +    }
    +  }
    +
    +  /**
    +   * Populate the fields from the includeFiledStr
    +   */
    +  private void populateIncludeFields()
    +  {
    +    includeFields = new String[2][];
    +    String[] portFields = includeFieldStr.split(";");
    +    for (int i = 0; i < portFields.length; i++) {
    +      includeFields[i] = portFields[i].split(",");
    +    }
    +  }
    +
    +  /**
    +   * Get the tuples from another store based on join constraint and key
    +   *
    +   * @param tuple  input
    +   * @param isLeft whether the given tuple is from first port or not
    +   */
    +  private void join(TimeEvent tuple, Boolean isLeft)
    +  {
    +    // Get the valid tuples from the store based on key
    +    // If the tuple is null means the join type is outer and return unmatched tuples from store.
    +    Object value;
    +    if (isLeft) {
    +      if (tuple != null) {
    +        value = store[1].getValidTuples(tuple);
    +      } else {
    +        value = store[1].getUnMatchedTuples();
    +      }
    +    } else {
    +      if (tuple != null) {
    +        value = store[0].getValidTuples(tuple);
    +      } else {
    +        value = store[0].getUnMatchedTuples();
    +      }
    +    }
    +    // Join the input tuple with the joined tuples
    +    if (value != null) {
    +      ArrayList<TimeEvent> joinedValues = (ArrayList<TimeEvent>)value;
    +      List<T> result = new ArrayList<>();
    +      for (TimeEvent joinedValue : joinedValues) {
    +        T output = createOutputTuple();
    +        Object tupleValue = null;
    +        if (tuple != null) {
    +          tupleValue = tuple.getValue();
    +        }
    +        copyValue(output, tupleValue, isLeft);
    +        copyValue(output, joinedValue.getValue(), !isLeft);
    +        result.add(output);
    +        joinedValue.setMatch(true);
    +      }
    +      if (tuple != null) {
    +        tuple.setMatch(true);
    +      }
    +      if (result.size() != 0) {
    +        outputPort.emit(result);
    +      }
    +    }
    +  }
    +
    +  // Emit the unmatched tuples, if the strategy is outer join
    +  @Override
    +  public void endWindow()
    +  {
    +    if (strategy.equals(JoinStrategy.LEFT_OUTER_JOIN) || strategy.equals(JoinStrategy.OUTER_JOIN)) {
    +      join(null, false);
    +    }
    +    if (strategy.equals(JoinStrategy.RIGHT_OUTER_JOIN) || strategy.equals(JoinStrategy.OUTER_JOIN)) {
    +      join(null, true);
    +    }
    +    store[0].endWindow();
    +    store[1].endWindow();
    +  }
    +
    +  @Override
    +  public void checkpointed(long windowId)
    +  {
    +    store[0].checkpointed(windowId);
    +    store[1].checkpointed(windowId);
    +  }
    +
    +  @Override
    +  public void committed(long windowId)
    +  {
    +    store[0].committed(windowId);
    +    store[1].committed(windowId);
    +  }
    +
    +  /**
    +   * Convert the given tuple to event
    +   *
    +   * @param tuple Given tuple to convert into event
    +   * @return event
    +   */
    +  protected TimeEvent createEvent(Object tuple)
    +  {
    +    int idx = 0;
    +    if (!isLeft) {
    +      idx = 1;
    +    }
    +    if (timeFields != null) {
    +      return new TimeEventImpl(getKeyValue(keys[idx], tuple), (Long)getTime(timeFields[idx], tuple), tuple);
    +    } else {
    +      return new TimeEventImpl(getKeyValue(keys[idx], tuple), Calendar.getInstance().getTimeInMillis(), tuple);
    --- End diff --
    
    If I have to do join on multiple fields?
    
    If you are not supporting join on multiple fields can please you mention it clearly in the Java doc that List contains only two items... Can you also size check in populateKeyFields and populateTimeFields?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: SPOI-4520 Implemented Inmemory...

Posted by chaithu14 <gi...@git.apache.org>.
Github user chaithu14 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/113#discussion_r46111296
  
    --- Diff: library/src/main/java/com/datatorrent/lib/join/AbstractJoinOperator.java ---
    @@ -0,0 +1,341 @@
    +/*
    + * Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved.
    + *
    + * Licensed under the Apache License, Version 2.0 (the "License");
    + * you may not use this file except in compliance with the License.
    + * You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package com.datatorrent.lib.join;
    +
    +import java.lang.reflect.Array;
    +import java.util.ArrayList;
    +import java.util.Calendar;
    +import java.util.List;
    +
    +import org.apache.hadoop.classification.InterfaceStability.Evolving;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DefaultInputPort;
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.api.Operator;
    +import com.datatorrent.api.annotation.InputPortFieldAnnotation;
    +import com.datatorrent.common.util.BaseOperator;
    +
    +/**
    + * <p>
    + * This is the base implementation of join operator. Operator receives tuples from multiple streams,
    + * applies the join operation based on constraint and emit the joined value.
    + * Subclasses should provide implementation to createOutputTuple,copyValue, getKeyValue, getTime methods.
    + *
    + * <b>Properties:</b><br>
    + * <b>expiryTime</b>: Expiry time for stored tuples<br>
    + * <b>includeFieldStr</b>: List of comma separated fields to be added to the output tuple.
    + *                         Ex: Field1,Field2;Field3,Field4<br>
    + * <b>keyFields</b>: List of comma separated key field for both the streams. Ex: Field1,Field2<br>
    + * <b>timeFields</b>: List of comma separated time field for both the streams. Ex: Field1,Field2<br>
    + * <b>bucketSpanInMillis</b>: Span of each bucket in milliseconds.<br>
    + * <b>strategy</b>: Type of join operation. Default type is inner join<br>
    + * <br>
    + * </p>
    + *
    + * @displayName Abstract Join Operator
    + * @tags join
    + * @since 3.3.0
    + */
    +@Evolving
    +public abstract class AbstractJoinOperator<T> extends BaseOperator implements Operator.CheckpointListener
    +{
    +  public final transient DefaultOutputPort<List<T>> outputPort = new DefaultOutputPort<>();
    +  protected String[] timeFields;
    +
    +  protected String[][] includeFields;
    +  // Fields to compare from both the streams
    +  protected String[] keys;
    +  // Strategy of Join operation, by default the option is inner join
    +  protected JoinStrategy strategy = JoinStrategy.INNER_JOIN;
    +  // This represents whether the processing tuple is from left port or not
    +  protected Boolean isLeft;
    +  @InputPortFieldAnnotation(optional = true)
    --- End diff --
    
    Right. will remove the optional.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: SPOI-4520 Implemented Inmemory...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/113#discussion_r45864929
  
    --- Diff: library/src/main/java/com/datatorrent/lib/join/BackupStore.java ---
    @@ -0,0 +1,76 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.join;
    +
    +import org.apache.hadoop.classification.InterfaceStability.Evolving;
    +
    +/**
    + * <p>
    + * Interface of store for join operation.
    + * </p>
    + */
    +@Evolving
    +public interface BackupStore
    --- End diff --
    
    can this extend com.datatorrent.api.Component??


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1720 Implemented Inmemory...

Posted by chaithu14 <gi...@git.apache.org>.
Github user chaithu14 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/113#discussion_r46127653
  
    --- Diff: library/src/main/java/com/datatorrent/lib/join/AbstractJoinOperator.java ---
    @@ -0,0 +1,341 @@
    +/*
    + * Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved.
    + *
    + * Licensed under the Apache License, Version 2.0 (the "License");
    + * you may not use this file except in compliance with the License.
    + * You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package com.datatorrent.lib.join;
    +
    +import java.lang.reflect.Array;
    +import java.util.ArrayList;
    +import java.util.Calendar;
    +import java.util.List;
    +
    +import org.apache.hadoop.classification.InterfaceStability.Evolving;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DefaultInputPort;
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.api.Operator;
    +import com.datatorrent.api.annotation.InputPortFieldAnnotation;
    +import com.datatorrent.common.util.BaseOperator;
    +
    +/**
    + * <p>
    + * This is the base implementation of join operator. Operator receives tuples from multiple streams,
    + * applies the join operation based on constraint and emit the joined value.
    + * Subclasses should provide implementation to createOutputTuple,copyValue, getKeyValue, getTime methods.
    + *
    + * <b>Properties:</b><br>
    + * <b>expiryTime</b>: Expiry time for stored tuples<br>
    + * <b>includeFieldStr</b>: List of comma separated fields to be added to the output tuple.
    + *                         Ex: Field1,Field2;Field3,Field4<br>
    + * <b>keyFields</b>: List of comma separated key field for both the streams. Ex: Field1,Field2<br>
    + * <b>timeFields</b>: List of comma separated time field for both the streams. Ex: Field1,Field2<br>
    + * <b>bucketSpanInMillis</b>: Span of each bucket in milliseconds.<br>
    + * <b>strategy</b>: Type of join operation. Default type is inner join<br>
    + * <br>
    + * </p>
    + *
    + * @displayName Abstract Join Operator
    + * @tags join
    + * @since 3.3.0
    + */
    +@Evolving
    +public abstract class AbstractJoinOperator<T> extends BaseOperator implements Operator.CheckpointListener
    +{
    +  public final transient DefaultOutputPort<List<T>> outputPort = new DefaultOutputPort<>();
    +  protected String[] timeFields;
    +
    +  protected String[][] includeFields;
    +  // Fields to compare from both the streams
    +  protected String[] keys;
    +  // Strategy of Join operation, by default the option is inner join
    +  protected JoinStrategy strategy = JoinStrategy.INNER_JOIN;
    +  // This represents whether the processing tuple is from left port or not
    +  protected Boolean isLeft;
    +  @InputPortFieldAnnotation(optional = true)
    +  public transient DefaultInputPort<T> input1 = new DefaultInputPort<T>()
    +  {
    +    @Override
    +    public void process(T tuple)
    +    {
    +      isLeft = true;
    +      processTuple(tuple);
    +    }
    +  };
    +  @InputPortFieldAnnotation(optional = true)
    +  public transient DefaultInputPort<T> input2 = new DefaultInputPort<T>()
    +  {
    +    @Override
    +    public void process(T tuple)
    +    {
    +      isLeft = false;
    +      processTuple(tuple);
    +    }
    +  };
    +  // Stores for each of the input port
    +  private BackupStore[] store = (BackupStore[])Array.newInstance(BackupStore.class, 2);
    +  private String includeFieldStr;
    +  private String keyFieldStr;
    +  private String timeFieldStr;
    +
    +  @Override
    +  public void setup(Context.OperatorContext context)
    +  {
    +    if (store[0] == null) {
    +      throw new RuntimeException("Left Store is Empty");
    +    }
    +    if (store[1] == null) {
    +      throw new RuntimeException("Right Store is Empty");
    +    }
    +    // Checks whether the strategy is outer join and set it to store
    +    boolean isOuter = strategy.equals(JoinStrategy.LEFT_OUTER_JOIN) || strategy.equals(JoinStrategy.OUTER_JOIN);
    +    store[0].isOuterJoin(isOuter);
    +    isOuter = strategy.equals(JoinStrategy.RIGHT_OUTER_JOIN) || strategy.equals(JoinStrategy.OUTER_JOIN);
    +    store[1].isOuterJoin(isOuter);
    +    // Setup the stores
    +    store[0].setup();
    +    store[1].setup();
    +
    +    populateFields();
    +  }
    +
    +  /**
    +   * Create the event with the given tuple. If it successfully inserted it into the store
    +   * then it does the join operation
    +   *
    +   * @param tuple Tuple to process
    +   */
    +  protected void processTuple(T tuple)
    +  {
    +    int idx = 0;
    +    if (!isLeft) {
    +      idx = 1;
    +    }
    +    TimeEvent t = createEvent(tuple);
    +    if (store[idx].put(t)) {
    +      join(t, isLeft);
    +    }
    +  }
    +
    +  private void populateFields()
    +  {
    +    populateIncludeFields();
    +    populateKeyFields();
    +    if (timeFieldStr != null) {
    +      populateTimeFields();
    +    }
    +  }
    +
    +  /**
    +   * Populate the fields from the includeFiledStr
    +   */
    +  private void populateIncludeFields()
    +  {
    +    includeFields = new String[2][];
    +    String[] portFields = includeFieldStr.split(";");
    +    for (int i = 0; i < portFields.length; i++) {
    +      includeFields[i] = portFields[i].split(",");
    +    }
    +  }
    +
    +  /**
    +   * Get the tuples from another store based on join constraint and key
    +   *
    +   * @param tuple  input
    +   * @param isLeft whether the given tuple is from first port or not
    +   */
    +  private void join(TimeEvent tuple, Boolean isLeft)
    +  {
    +    // Get the valid tuples from the store based on key
    +    // If the tuple is null means the join type is outer and return unmatched tuples from store.
    +    Object value;
    +    if (isLeft) {
    +      if (tuple != null) {
    +        value = store[1].getValidTuples(tuple);
    +      } else {
    +        value = store[1].getUnMatchedTuples();
    +      }
    +    } else {
    +      if (tuple != null) {
    +        value = store[0].getValidTuples(tuple);
    +      } else {
    +        value = store[0].getUnMatchedTuples();
    +      }
    +    }
    +    // Join the input tuple with the joined tuples
    +    if (value != null) {
    +      ArrayList<TimeEvent> joinedValues = (ArrayList<TimeEvent>)value;
    +      List<T> result = new ArrayList<>();
    +      for (TimeEvent joinedValue : joinedValues) {
    +        T output = createOutputTuple();
    +        Object tupleValue = null;
    +        if (tuple != null) {
    +          tupleValue = tuple.getValue();
    +        }
    +        copyValue(output, tupleValue, isLeft);
    +        copyValue(output, joinedValue.getValue(), !isLeft);
    +        result.add(output);
    +        joinedValue.setMatch(true);
    +      }
    +      if (tuple != null) {
    +        tuple.setMatch(true);
    +      }
    +      if (result.size() != 0) {
    +        outputPort.emit(result);
    +      }
    +    }
    +  }
    +
    +  // Emit the unmatched tuples, if the strategy is outer join
    +  @Override
    +  public void endWindow()
    +  {
    +    if (strategy.equals(JoinStrategy.LEFT_OUTER_JOIN) || strategy.equals(JoinStrategy.OUTER_JOIN)) {
    +      join(null, false);
    +    }
    +    if (strategy.equals(JoinStrategy.RIGHT_OUTER_JOIN) || strategy.equals(JoinStrategy.OUTER_JOIN)) {
    +      join(null, true);
    +    }
    +    store[0].endWindow();
    +    store[1].endWindow();
    +  }
    +
    +  @Override
    +  public void checkpointed(long windowId)
    +  {
    +    store[0].checkpointed(windowId);
    +    store[1].checkpointed(windowId);
    +  }
    +
    +  @Override
    +  public void committed(long windowId)
    +  {
    +    store[0].committed(windowId);
    +    store[1].committed(windowId);
    +  }
    +
    +  /**
    +   * Convert the given tuple to event
    +   *
    +   * @param tuple Given tuple to convert into event
    +   * @return event
    +   */
    +  protected TimeEvent createEvent(Object tuple)
    +  {
    +    int idx = 0;
    +    if (!isLeft) {
    +      idx = 1;
    +    }
    +    if (timeFields != null) {
    +      return new TimeEventImpl(getKeyValue(keys[idx], tuple), (Long)getTime(timeFields[idx], tuple), tuple);
    +    } else {
    +      return new TimeEventImpl(getKeyValue(keys[idx], tuple), Calendar.getInstance().getTimeInMillis(), tuple);
    --- End diff --
    
    Added


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: SPOI-4520 Implemented Inmemory...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/113#discussion_r45859943
  
    --- Diff: library/src/main/java/com/datatorrent/lib/join/AbstractJoinOperator.java ---
    @@ -0,0 +1,341 @@
    +/*
    + * Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved.
    + *
    + * Licensed under the Apache License, Version 2.0 (the "License");
    + * you may not use this file except in compliance with the License.
    + * You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package com.datatorrent.lib.join;
    +
    +import java.lang.reflect.Array;
    +import java.util.ArrayList;
    +import java.util.Calendar;
    +import java.util.List;
    +
    +import org.apache.hadoop.classification.InterfaceStability.Evolving;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DefaultInputPort;
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.api.Operator;
    +import com.datatorrent.api.annotation.InputPortFieldAnnotation;
    +import com.datatorrent.common.util.BaseOperator;
    +
    +/**
    + * <p>
    + * This is the base implementation of join operator. Operator receives tuples from multiple streams,
    + * applies the join operation based on constraint and emit the joined value.
    + * Subclasses should provide implementation to createOutputTuple,copyValue, getKeyValue, getTime methods.
    + *
    + * <b>Properties:</b><br>
    + * <b>expiryTime</b>: Expiry time for stored tuples<br>
    + * <b>includeFieldStr</b>: List of comma separated fields to be added to the output tuple.
    + *                         Ex: Field1,Field2;Field3,Field4<br>
    + * <b>keyFields</b>: List of comma separated key field for both the streams. Ex: Field1,Field2<br>
    + * <b>timeFields</b>: List of comma separated time field for both the streams. Ex: Field1,Field2<br>
    + * <b>bucketSpanInMillis</b>: Span of each bucket in milliseconds.<br>
    + * <b>strategy</b>: Type of join operation. Default type is inner join<br>
    + * <br>
    + * </p>
    + *
    + * @displayName Abstract Join Operator
    + * @tags join
    + * @since 3.3.0
    --- End diff --
    
    since tag not required


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1720 Implemented Inmemory...

Posted by chaithu14 <gi...@git.apache.org>.
Github user chaithu14 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/113#discussion_r46126616
  
    --- Diff: library/src/main/java/com/datatorrent/lib/join/TimeBasedStore.java ---
    @@ -0,0 +1,333 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.join;
    +
    +import java.lang.reflect.Array;
    +import java.util.ArrayList;
    +import java.util.Calendar;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.Timer;
    +import java.util.TimerTask;
    +import java.util.concurrent.ConcurrentHashMap;
    +
    +import javax.validation.constraints.Min;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import org.apache.hadoop.classification.InterfaceStability;
    +
    +/**
    + * Base implementation of time based store for key-value pair tuples.
    + *
    + * @param <T>
    + */
    +@InterfaceStability.Evolving
    +public class TimeBasedStore<T extends TimeEvent>
    +{
    --- End diff --
    
    I thought this would be clean way. TimeBasedStore is template store and is plugin for InMemoryStore.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: SPOI-4520 Implemented Inmemory...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/113#discussion_r45860044
  
    --- Diff: library/src/main/java/com/datatorrent/lib/join/AbstractJoinOperator.java ---
    @@ -0,0 +1,341 @@
    +/*
    + * Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved.
    + *
    + * Licensed under the Apache License, Version 2.0 (the "License");
    + * you may not use this file except in compliance with the License.
    + * You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package com.datatorrent.lib.join;
    +
    +import java.lang.reflect.Array;
    +import java.util.ArrayList;
    +import java.util.Calendar;
    +import java.util.List;
    +
    +import org.apache.hadoop.classification.InterfaceStability.Evolving;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DefaultInputPort;
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.api.Operator;
    +import com.datatorrent.api.annotation.InputPortFieldAnnotation;
    +import com.datatorrent.common.util.BaseOperator;
    +
    +/**
    + * <p>
    + * This is the base implementation of join operator. Operator receives tuples from multiple streams,
    --- End diff --
    
    It is from two streams?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: SPOI-4520 Implemented Inmemory...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/113#discussion_r45862670
  
    --- Diff: library/src/main/java/com/datatorrent/lib/join/AbstractJoinOperator.java ---
    @@ -0,0 +1,341 @@
    +/*
    + * Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved.
    + *
    + * Licensed under the Apache License, Version 2.0 (the "License");
    + * you may not use this file except in compliance with the License.
    + * You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package com.datatorrent.lib.join;
    +
    +import java.lang.reflect.Array;
    +import java.util.ArrayList;
    +import java.util.Calendar;
    +import java.util.List;
    +
    +import org.apache.hadoop.classification.InterfaceStability.Evolving;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DefaultInputPort;
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.api.Operator;
    +import com.datatorrent.api.annotation.InputPortFieldAnnotation;
    +import com.datatorrent.common.util.BaseOperator;
    +
    +/**
    + * <p>
    + * This is the base implementation of join operator. Operator receives tuples from multiple streams,
    + * applies the join operation based on constraint and emit the joined value.
    + * Subclasses should provide implementation to createOutputTuple,copyValue, getKeyValue, getTime methods.
    + *
    + * <b>Properties:</b><br>
    + * <b>expiryTime</b>: Expiry time for stored tuples<br>
    + * <b>includeFieldStr</b>: List of comma separated fields to be added to the output tuple.
    + *                         Ex: Field1,Field2;Field3,Field4<br>
    + * <b>keyFields</b>: List of comma separated key field for both the streams. Ex: Field1,Field2<br>
    + * <b>timeFields</b>: List of comma separated time field for both the streams. Ex: Field1,Field2<br>
    + * <b>bucketSpanInMillis</b>: Span of each bucket in milliseconds.<br>
    + * <b>strategy</b>: Type of join operation. Default type is inner join<br>
    + * <br>
    + * </p>
    + *
    + * @displayName Abstract Join Operator
    + * @tags join
    + * @since 3.3.0
    + */
    +@Evolving
    +public abstract class AbstractJoinOperator<T> extends BaseOperator implements Operator.CheckpointListener
    +{
    +  public final transient DefaultOutputPort<List<T>> outputPort = new DefaultOutputPort<>();
    +  protected String[] timeFields;
    +
    +  protected String[][] includeFields;
    +  // Fields to compare from both the streams
    +  protected String[] keys;
    +  // Strategy of Join operation, by default the option is inner join
    +  protected JoinStrategy strategy = JoinStrategy.INNER_JOIN;
    +  // This represents whether the processing tuple is from left port or not
    +  protected Boolean isLeft;
    +  @InputPortFieldAnnotation(optional = true)
    +  public transient DefaultInputPort<T> input1 = new DefaultInputPort<T>()
    +  {
    +    @Override
    +    public void process(T tuple)
    +    {
    +      isLeft = true;
    +      processTuple(tuple);
    +    }
    +  };
    +  @InputPortFieldAnnotation(optional = true)
    +  public transient DefaultInputPort<T> input2 = new DefaultInputPort<T>()
    +  {
    +    @Override
    +    public void process(T tuple)
    +    {
    +      isLeft = false;
    +      processTuple(tuple);
    +    }
    +  };
    +  // Stores for each of the input port
    +  private BackupStore[] store = (BackupStore[])Array.newInstance(BackupStore.class, 2);
    +  private String includeFieldStr;
    +  private String keyFieldStr;
    +  private String timeFieldStr;
    +
    +  @Override
    +  public void setup(Context.OperatorContext context)
    +  {
    +    if (store[0] == null) {
    +      throw new RuntimeException("Left Store is Empty");
    +    }
    +    if (store[1] == null) {
    +      throw new RuntimeException("Right Store is Empty");
    +    }
    +    // Checks whether the strategy is outer join and set it to store
    +    boolean isOuter = strategy.equals(JoinStrategy.LEFT_OUTER_JOIN) || strategy.equals(JoinStrategy.OUTER_JOIN);
    +    store[0].isOuterJoin(isOuter);
    +    isOuter = strategy.equals(JoinStrategy.RIGHT_OUTER_JOIN) || strategy.equals(JoinStrategy.OUTER_JOIN);
    +    store[1].isOuterJoin(isOuter);
    +    // Setup the stores
    +    store[0].setup();
    +    store[1].setup();
    +
    +    populateFields();
    +  }
    +
    +  /**
    +   * Create the event with the given tuple. If it successfully inserted it into the store
    +   * then it does the join operation
    +   *
    +   * @param tuple Tuple to process
    +   */
    +  protected void processTuple(T tuple)
    +  {
    +    int idx = 0;
    +    if (!isLeft) {
    +      idx = 1;
    +    }
    +    TimeEvent t = createEvent(tuple);
    +    if (store[idx].put(t)) {
    +      join(t, isLeft);
    +    }
    +  }
    +
    +  private void populateFields()
    +  {
    +    populateIncludeFields();
    +    populateKeyFields();
    +    if (timeFieldStr != null) {
    +      populateTimeFields();
    +    }
    +  }
    +
    +  /**
    +   * Populate the fields from the includeFiledStr
    +   */
    +  private void populateIncludeFields()
    +  {
    +    includeFields = new String[2][];
    +    String[] portFields = includeFieldStr.split(";");
    +    for (int i = 0; i < portFields.length; i++) {
    +      includeFields[i] = portFields[i].split(",");
    +    }
    +  }
    +
    +  /**
    +   * Get the tuples from another store based on join constraint and key
    +   *
    +   * @param tuple  input
    +   * @param isLeft whether the given tuple is from first port or not
    +   */
    +  private void join(TimeEvent tuple, Boolean isLeft)
    +  {
    +    // Get the valid tuples from the store based on key
    +    // If the tuple is null means the join type is outer and return unmatched tuples from store.
    +    Object value;
    +    if (isLeft) {
    +      if (tuple != null) {
    +        value = store[1].getValidTuples(tuple);
    +      } else {
    +        value = store[1].getUnMatchedTuples();
    +      }
    +    } else {
    +      if (tuple != null) {
    +        value = store[0].getValidTuples(tuple);
    +      } else {
    +        value = store[0].getUnMatchedTuples();
    +      }
    +    }
    +    // Join the input tuple with the joined tuples
    +    if (value != null) {
    +      ArrayList<TimeEvent> joinedValues = (ArrayList<TimeEvent>)value;
    +      List<T> result = new ArrayList<>();
    +      for (TimeEvent joinedValue : joinedValues) {
    +        T output = createOutputTuple();
    +        Object tupleValue = null;
    +        if (tuple != null) {
    +          tupleValue = tuple.getValue();
    +        }
    +        copyValue(output, tupleValue, isLeft);
    +        copyValue(output, joinedValue.getValue(), !isLeft);
    +        result.add(output);
    +        joinedValue.setMatch(true);
    +      }
    +      if (tuple != null) {
    +        tuple.setMatch(true);
    +      }
    +      if (result.size() != 0) {
    +        outputPort.emit(result);
    +      }
    +    }
    +  }
    +
    +  // Emit the unmatched tuples, if the strategy is outer join
    +  @Override
    +  public void endWindow()
    +  {
    +    if (strategy.equals(JoinStrategy.LEFT_OUTER_JOIN) || strategy.equals(JoinStrategy.OUTER_JOIN)) {
    +      join(null, false);
    +    }
    +    if (strategy.equals(JoinStrategy.RIGHT_OUTER_JOIN) || strategy.equals(JoinStrategy.OUTER_JOIN)) {
    +      join(null, true);
    +    }
    +    store[0].endWindow();
    +    store[1].endWindow();
    +  }
    +
    +  @Override
    +  public void checkpointed(long windowId)
    +  {
    +    store[0].checkpointed(windowId);
    +    store[1].checkpointed(windowId);
    +  }
    +
    +  @Override
    +  public void committed(long windowId)
    +  {
    +    store[0].committed(windowId);
    +    store[1].committed(windowId);
    +  }
    +
    +  /**
    +   * Convert the given tuple to event
    +   *
    +   * @param tuple Given tuple to convert into event
    +   * @return event
    +   */
    +  protected TimeEvent createEvent(Object tuple)
    +  {
    +    int idx = 0;
    +    if (!isLeft) {
    +      idx = 1;
    +    }
    +    if (timeFields != null) {
    +      return new TimeEventImpl(getKeyValue(keys[idx], tuple), (Long)getTime(timeFields[idx], tuple), tuple);
    +    } else {
    +      return new TimeEventImpl(getKeyValue(keys[idx], tuple), Calendar.getInstance().getTimeInMillis(), tuple);
    --- End diff --
    
    what happens if keys have 3 elements?? From this it looks like it can't be used but documentation says "List of comma separated key field for both the streams"
    
    How do I join on multiple keys?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1720 Implemented Inmemory...

Posted by chaithu14 <gi...@git.apache.org>.
Github user chaithu14 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/113#discussion_r49056224
  
    --- Diff: library/src/main/java/com/datatorrent/lib/join/JoinStore.java ---
    @@ -0,0 +1,91 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.join;
    +
    +import java.util.List;
    +
    +import org.apache.hadoop.classification.InterfaceStability;
    +
    +import com.datatorrent.api.Component;
    +
    +/**
    + * <p>
    + * Interface of store for join operation.
    + * </p>
    + */
    +@InterfaceStability.Unstable
    +public interface JoinStore extends Component
    +{
    +  /**
    +   * Generate the store
    +   */
    +
    +  /**
    +   * Perform the committed operation
    +   * @param windowId
    +   */
    +  void committed(long windowId);
    +
    +  /**
    +   * Save the state of store
    +   * @param windowId
    +   */
    +  void checkpointed(long windowId);
    +
    +  /**
    +   * Add the operations, any needed for store before begin the window
    +   * @param windowId
    +   */
    +  void beginWindow(long windowId);
    +
    +  /**
    +   *
    +   */
    +  void endWindow();
    +
    +  /**
    +   * Get the key from the given tuple and with that key, get the tuples which satisfies the join constraint
    +   * from the store.
    +   *
    +   * @param tuple Given tuple
    +   * @return the valid tuples which statisfies the join constraint
    +   */
    +  List<?> getValidTuples(Object tuple);
    +
    +  /**
    +   * Insert the given tuple
    +   *
    +   * @param tuple Given tuple
    +   */
    +  Boolean put(Object tuple);
    --- End diff --
    
    Don't remembered, why Boolean object is used. Return type should be a boolean.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: SPOI-4520 Implemented Inmemory...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/113#discussion_r45861794
  
    --- Diff: library/src/main/java/com/datatorrent/lib/join/AbstractJoinOperator.java ---
    @@ -0,0 +1,341 @@
    +/*
    + * Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved.
    + *
    + * Licensed under the Apache License, Version 2.0 (the "License");
    + * you may not use this file except in compliance with the License.
    + * You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package com.datatorrent.lib.join;
    +
    +import java.lang.reflect.Array;
    +import java.util.ArrayList;
    +import java.util.Calendar;
    +import java.util.List;
    +
    +import org.apache.hadoop.classification.InterfaceStability.Evolving;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DefaultInputPort;
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.api.Operator;
    +import com.datatorrent.api.annotation.InputPortFieldAnnotation;
    +import com.datatorrent.common.util.BaseOperator;
    +
    +/**
    + * <p>
    + * This is the base implementation of join operator. Operator receives tuples from multiple streams,
    + * applies the join operation based on constraint and emit the joined value.
    + * Subclasses should provide implementation to createOutputTuple,copyValue, getKeyValue, getTime methods.
    + *
    + * <b>Properties:</b><br>
    + * <b>expiryTime</b>: Expiry time for stored tuples<br>
    + * <b>includeFieldStr</b>: List of comma separated fields to be added to the output tuple.
    + *                         Ex: Field1,Field2;Field3,Field4<br>
    + * <b>keyFields</b>: List of comma separated key field for both the streams. Ex: Field1,Field2<br>
    + * <b>timeFields</b>: List of comma separated time field for both the streams. Ex: Field1,Field2<br>
    + * <b>bucketSpanInMillis</b>: Span of each bucket in milliseconds.<br>
    + * <b>strategy</b>: Type of join operation. Default type is inner join<br>
    + * <br>
    + * </p>
    + *
    + * @displayName Abstract Join Operator
    + * @tags join
    + * @since 3.3.0
    + */
    +@Evolving
    +public abstract class AbstractJoinOperator<T> extends BaseOperator implements Operator.CheckpointListener
    +{
    +  public final transient DefaultOutputPort<List<T>> outputPort = new DefaultOutputPort<>();
    +  protected String[] timeFields;
    +
    +  protected String[][] includeFields;
    +  // Fields to compare from both the streams
    +  protected String[] keys;
    +  // Strategy of Join operation, by default the option is inner join
    +  protected JoinStrategy strategy = JoinStrategy.INNER_JOIN;
    +  // This represents whether the processing tuple is from left port or not
    +  protected Boolean isLeft;
    +  @InputPortFieldAnnotation(optional = true)
    +  public transient DefaultInputPort<T> input1 = new DefaultInputPort<T>()
    +  {
    +    @Override
    +    public void process(T tuple)
    +    {
    +      isLeft = true;
    +      processTuple(tuple);
    +    }
    +  };
    +  @InputPortFieldAnnotation(optional = true)
    +  public transient DefaultInputPort<T> input2 = new DefaultInputPort<T>()
    +  {
    +    @Override
    +    public void process(T tuple)
    +    {
    +      isLeft = false;
    +      processTuple(tuple);
    +    }
    +  };
    +  // Stores for each of the input port
    +  private BackupStore[] store = (BackupStore[])Array.newInstance(BackupStore.class, 2);
    +  private String includeFieldStr;
    +  private String keyFieldStr;
    +  private String timeFieldStr;
    +
    +  @Override
    +  public void setup(Context.OperatorContext context)
    +  {
    +    if (store[0] == null) {
    +      throw new RuntimeException("Left Store is Empty");
    +    }
    +    if (store[1] == null) {
    +      throw new RuntimeException("Right Store is Empty");
    +    }
    +    // Checks whether the strategy is outer join and set it to store
    +    boolean isOuter = strategy.equals(JoinStrategy.LEFT_OUTER_JOIN) || strategy.equals(JoinStrategy.OUTER_JOIN);
    +    store[0].isOuterJoin(isOuter);
    +    isOuter = strategy.equals(JoinStrategy.RIGHT_OUTER_JOIN) || strategy.equals(JoinStrategy.OUTER_JOIN);
    +    store[1].isOuterJoin(isOuter);
    +    // Setup the stores
    +    store[0].setup();
    +    store[1].setup();
    +
    +    populateFields();
    +  }
    +
    +  /**
    +   * Create the event with the given tuple. If it successfully inserted it into the store
    +   * then it does the join operation
    +   *
    +   * @param tuple Tuple to process
    +   */
    +  protected void processTuple(T tuple)
    +  {
    +    int idx = 0;
    +    if (!isLeft) {
    +      idx = 1;
    +    }
    +    TimeEvent t = createEvent(tuple);
    +    if (store[idx].put(t)) {
    +      join(t, isLeft);
    +    }
    +  }
    +
    +  private void populateFields()
    +  {
    +    populateIncludeFields();
    +    populateKeyFields();
    +    if (timeFieldStr != null) {
    +      populateTimeFields();
    +    }
    +  }
    +
    +  /**
    +   * Populate the fields from the includeFiledStr
    +   */
    +  private void populateIncludeFields()
    +  {
    +    includeFields = new String[2][];
    +    String[] portFields = includeFieldStr.split(";");
    +    for (int i = 0; i < portFields.length; i++) {
    +      includeFields[i] = portFields[i].split(",");
    +    }
    +  }
    +
    +  /**
    +   * Get the tuples from another store based on join constraint and key
    +   *
    +   * @param tuple  input
    +   * @param isLeft whether the given tuple is from first port or not
    +   */
    +  private void join(TimeEvent tuple, Boolean isLeft)
    +  {
    +    // Get the valid tuples from the store based on key
    +    // If the tuple is null means the join type is outer and return unmatched tuples from store.
    +    Object value;
    +    if (isLeft) {
    +      if (tuple != null) {
    +        value = store[1].getValidTuples(tuple);
    +      } else {
    +        value = store[1].getUnMatchedTuples();
    +      }
    +    } else {
    +      if (tuple != null) {
    +        value = store[0].getValidTuples(tuple);
    +      } else {
    +        value = store[0].getUnMatchedTuples();
    +      }
    +    }
    +    // Join the input tuple with the joined tuples
    +    if (value != null) {
    +      ArrayList<TimeEvent> joinedValues = (ArrayList<TimeEvent>)value;
    +      List<T> result = new ArrayList<>();
    +      for (TimeEvent joinedValue : joinedValues) {
    +        T output = createOutputTuple();
    +        Object tupleValue = null;
    +        if (tuple != null) {
    +          tupleValue = tuple.getValue();
    +        }
    +        copyValue(output, tupleValue, isLeft);
    +        copyValue(output, joinedValue.getValue(), !isLeft);
    +        result.add(output);
    +        joinedValue.setMatch(true);
    +      }
    +      if (tuple != null) {
    +        tuple.setMatch(true);
    +      }
    +      if (result.size() != 0) {
    +        outputPort.emit(result);
    +      }
    +    }
    +  }
    +
    +  // Emit the unmatched tuples, if the strategy is outer join
    +  @Override
    +  public void endWindow()
    +  {
    +    if (strategy.equals(JoinStrategy.LEFT_OUTER_JOIN) || strategy.equals(JoinStrategy.OUTER_JOIN)) {
    +      join(null, false);
    +    }
    +    if (strategy.equals(JoinStrategy.RIGHT_OUTER_JOIN) || strategy.equals(JoinStrategy.OUTER_JOIN)) {
    +      join(null, true);
    +    }
    +    store[0].endWindow();
    +    store[1].endWindow();
    +  }
    +
    +  @Override
    +  public void checkpointed(long windowId)
    +  {
    +    store[0].checkpointed(windowId);
    +    store[1].checkpointed(windowId);
    +  }
    +
    +  @Override
    +  public void committed(long windowId)
    +  {
    +    store[0].committed(windowId);
    +    store[1].committed(windowId);
    +  }
    +
    +  /**
    +   * Convert the given tuple to event
    +   *
    +   * @param tuple Given tuple to convert into event
    +   * @return event
    +   */
    +  protected TimeEvent createEvent(Object tuple)
    +  {
    +    int idx = 0;
    +    if (!isLeft) {
    +      idx = 1;
    +    }
    +    if (timeFields != null) {
    +      return new TimeEventImpl(getKeyValue(keys[idx], tuple), (Long)getTime(timeFields[idx], tuple), tuple);
    +    } else {
    +      return new TimeEventImpl(getKeyValue(keys[idx], tuple), Calendar.getInstance().getTimeInMillis(), tuple);
    +    }
    +  }
    +
    +  public void populateKeyFields()
    +  {
    +    this.keys = keyFieldStr.split(",");
    +  }
    +
    +  public JoinStrategy getStrategy()
    +  {
    +    return strategy;
    +  }
    +
    +  public void setStrategy(JoinStrategy strategy)
    +  {
    +    this.strategy = strategy;
    +  }
    +
    +  public void setLeftStore(BackupStore lStore)
    --- End diff --
    
    check here if store being set is null or not


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1720 Implemented Inmemory...

Posted by chandnisingh <gi...@git.apache.org>.
Github user chandnisingh commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/113#discussion_r46710446
  
    --- Diff: library/src/main/java/com/datatorrent/lib/join/Store.java ---
    @@ -0,0 +1,91 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.join;
    +
    +import java.util.List;
    +
    +import org.apache.hadoop.classification.InterfaceStability.Evolving;
    +
    +import com.datatorrent.api.Component;
    +
    +/**
    + * <p>
    + * Interface of store for join operation.
    + * </p>
    + */
    +@Evolving
    +public interface Store extends Component
    --- End diff --
    
    It is too specific to join. Should it be called JoinStore?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: SPOI-4520 Implemented Inmemory...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/113#discussion_r45860110
  
    --- Diff: library/src/main/java/com/datatorrent/lib/join/AbstractJoinOperator.java ---
    @@ -0,0 +1,341 @@
    +/*
    + * Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved.
    + *
    + * Licensed under the Apache License, Version 2.0 (the "License");
    + * you may not use this file except in compliance with the License.
    + * You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package com.datatorrent.lib.join;
    +
    +import java.lang.reflect.Array;
    +import java.util.ArrayList;
    +import java.util.Calendar;
    +import java.util.List;
    +
    +import org.apache.hadoop.classification.InterfaceStability.Evolving;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DefaultInputPort;
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.api.Operator;
    +import com.datatorrent.api.annotation.InputPortFieldAnnotation;
    +import com.datatorrent.common.util.BaseOperator;
    +
    +/**
    + * <p>
    + * This is the base implementation of join operator. Operator receives tuples from multiple streams,
    + * applies the join operation based on constraint and emit the joined value.
    + * Subclasses should provide implementation to createOutputTuple,copyValue, getKeyValue, getTime methods.
    + *
    + * <b>Properties:</b><br>
    + * <b>expiryTime</b>: Expiry time for stored tuples<br>
    + * <b>includeFieldStr</b>: List of comma separated fields to be added to the output tuple.
    + *                         Ex: Field1,Field2;Field3,Field4<br>
    + * <b>keyFields</b>: List of comma separated key field for both the streams. Ex: Field1,Field2<br>
    + * <b>timeFields</b>: List of comma separated time field for both the streams. Ex: Field1,Field2<br>
    --- End diff --
    
    can you explain what is timeField and difference between keyFields and timeField.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1720 Implemented Inmemory...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/113#discussion_r46124072
  
    --- Diff: library/src/main/java/com/datatorrent/lib/join/TimeBasedStore.java ---
    @@ -0,0 +1,333 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.join;
    +
    +import java.lang.reflect.Array;
    +import java.util.ArrayList;
    +import java.util.Calendar;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.Timer;
    +import java.util.TimerTask;
    +import java.util.concurrent.ConcurrentHashMap;
    +
    +import javax.validation.constraints.Min;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import org.apache.hadoop.classification.InterfaceStability;
    +
    +/**
    + * Base implementation of time based store for key-value pair tuples.
    + *
    + * @param <T>
    + */
    +@InterfaceStability.Evolving
    +public class TimeBasedStore<T extends TimeEvent>
    +{
    --- End diff --
    
    Can you please explain the reason? I see it has same functions as defined in BackupStore


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: SPOI-4520 Implemented Inmemory...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/113#discussion_r45863234
  
    --- Diff: library/src/main/java/com/datatorrent/lib/join/BackupStore.java ---
    @@ -0,0 +1,73 @@
    +/*
    + * Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved.
    + *
    + * Licensed under the Apache License, Version 2.0 (the "License");
    + * you may not use this file except in compliance with the License.
    + * You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package com.datatorrent.lib.join;
    +
    +import org.apache.hadoop.classification.InterfaceStability.Evolving;
    +
    +/**
    + * <p>
    + * Interface of store for join operation.
    + * </p>
    + */
    +@Evolving
    +public interface BackupStore
    +{
    +  /**
    +   * Generate the store
    +   */
    +  void setup();
    +
    +  void committed(long windowId);
    +
    +  void checkpointed(long windowId);
    +
    +  void endWindow();
    +
    +  /**
    +   * Get the key from the given tuple and with that key, get the tuples which satisfies the join constraint
    +   * from the store.
    +   *
    +   * @param tuple Given tuple
    +   * @return the valid tuples which statisfies the join constraint
    +   */
    +  Object getValidTuples(Object tuple);
    --- End diff --
    
    why is this object and not List<TimeEvent>??


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1720 Implemented Inmemory...

Posted by chaithu14 <gi...@git.apache.org>.
Github user chaithu14 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/113#discussion_r46119349
  
    --- Diff: library/src/main/java/com/datatorrent/lib/join/AbstractJoinOperator.java ---
    @@ -0,0 +1,343 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.join;
    +
    +import java.lang.reflect.Array;
    +import java.util.ArrayList;
    +import java.util.Calendar;
    +import java.util.List;
    +
    +import org.apache.hadoop.classification.InterfaceStability.Evolving;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DefaultInputPort;
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.api.Operator;
    +import com.datatorrent.api.annotation.InputPortFieldAnnotation;
    +import com.datatorrent.common.util.BaseOperator;
    +
    +/**
    + * <p>
    + * This is the base implementation of join operator. Operator receives tuples from multiple streams,
    + * applies the join operation based on constraint and emit the joined value.
    + * Subclasses should provide implementation to createOutputTuple,copyValue, getKeyValue, getTime methods.
    + *
    + * <b>Properties:</b><br>
    + * <b>expiryTime</b>: Expiry time for stored tuples<br>
    + * <b>includeFieldStr</b>: List of comma separated fields to be added to the output tuple.
    + *                         Ex: Field1,Field2;Field3,Field4<br>
    + * <b>keyFields</b>: List of comma separated key field for both the streams. Ex: Field1,Field2<br>
    + * <b>timeFields</b>: List of comma separated time field for both the streams. Ex: Field1,Field2<br>
    + * <b>bucketSpanInMillis</b>: Span of each bucket in milliseconds.<br>
    + * <b>strategy</b>: Type of join operation. Default type is inner join<br>
    + * <br>
    + * </p>
    + *
    + * @displayName Abstract Join Operator
    + * @tags join
    + */
    +@Evolving
    +public abstract class AbstractJoinOperator<T> extends BaseOperator implements Operator.CheckpointListener
    +{
    +  public final transient DefaultOutputPort<List<T>> outputPort = new DefaultOutputPort<>();
    +  protected String[] timeFields;
    +
    +  protected String[][] includeFields;
    +  // Fields to compare from both the streams
    +  protected String[] keys;
    +  // Strategy of Join operation, by default the option is inner join
    +  protected JoinStrategy strategy = JoinStrategy.INNER_JOIN;
    +  // This represents whether the processing tuple is from left port or not
    +  protected Boolean isLeft;
    +  @InputPortFieldAnnotation(optional = true)
    +  public transient DefaultInputPort<T> input1 = new DefaultInputPort<T>()
    +  {
    +    @Override
    +    public void process(T tuple)
    +    {
    +      isLeft = true;
    +      processTuple(tuple);
    +    }
    +  };
    +  @InputPortFieldAnnotation(optional = true)
    +  public transient DefaultInputPort<T> input2 = new DefaultInputPort<T>()
    +  {
    +    @Override
    +    public void process(T tuple)
    +    {
    +      isLeft = false;
    +      processTuple(tuple);
    +    }
    +  };
    +  // Stores for each of the input port
    +  private BackupStore[] store = (BackupStore[])Array.newInstance(BackupStore.class, 2);
    +  private String includeFieldStr;
    +  private String keyFieldStr;
    +  private String timeFieldStr;
    +
    +  @Override
    +  public void setup(Context.OperatorContext context)
    +  {
    +    if (store[0] == null) {
    +      throw new RuntimeException("Left Store is Empty");
    +    }
    +    if (store[1] == null) {
    +      throw new RuntimeException("Right Store is Empty");
    +    }
    +    // Checks whether the strategy is outer join and set it to store
    +    boolean isOuter = strategy.equals(JoinStrategy.LEFT_OUTER_JOIN) || strategy.equals(JoinStrategy.OUTER_JOIN);
    +    store[0].isOuterJoin(isOuter);
    +    isOuter = strategy.equals(JoinStrategy.RIGHT_OUTER_JOIN) || strategy.equals(JoinStrategy.OUTER_JOIN);
    +    store[1].isOuterJoin(isOuter);
    +    // Setup the stores
    +    store[0].setup();
    +    store[1].setup();
    +
    +    populateFields();
    +  }
    +
    +  /**
    +   * Create the event with the given tuple. If it successfully inserted it into the store
    +   * then it does the join operation
    +   *
    +   * @param tuple Tuple to process
    +   */
    +  protected void processTuple(T tuple)
    +  {
    +    int idx = 0;
    +    if (!isLeft) {
    +      idx = 1;
    +    }
    +    TimeEvent t = createEvent(tuple);
    +    if (store[idx].put(t)) {
    +      join(t, isLeft);
    +    }
    +  }
    +
    +  private void populateFields()
    +  {
    +    populateIncludeFields();
    +    populateKeyFields();
    +    if (timeFieldStr != null) {
    +      populateTimeFields();
    +    }
    +  }
    +
    +  /**
    +   * Populate the fields from the includeFiledStr
    +   */
    +  private void populateIncludeFields()
    +  {
    +    includeFields = new String[2][];
    +    String[] portFields = includeFieldStr.split(";");
    +    for (int i = 0; i < portFields.length; i++) {
    +      includeFields[i] = portFields[i].split(",");
    +    }
    +  }
    +
    +  /**
    +   * Get the tuples from another store based on join constraint and key
    +   *
    +   * @param tuple  input
    +   * @param isLeft whether the given tuple is from first port or not
    +   */
    +  private void join(TimeEvent tuple, Boolean isLeft)
    +  {
    +    // Get the valid tuples from the store based on key
    +    // If the tuple is null means the join type is outer and return unmatched tuples from store.
    +    Object value;
    +    if (isLeft) {
    +      if (tuple != null) {
    +        value = store[1].getValidTuples(tuple);
    +      } else {
    +        value = store[1].getUnMatchedTuples();
    +      }
    +    } else {
    +      if (tuple != null) {
    +        value = store[0].getValidTuples(tuple);
    +      } else {
    +        value = store[0].getUnMatchedTuples();
    +      }
    +    }
    +    // Join the input tuple with the joined tuples
    +    if (value != null) {
    +      ArrayList<TimeEvent> joinedValues = (ArrayList<TimeEvent>)value;
    +      List<T> result = new ArrayList<>();
    +      for (TimeEvent joinedValue : joinedValues) {
    +        T output = createOutputTuple();
    +        Object tupleValue = null;
    +        if (tuple != null) {
    +          tupleValue = tuple.getValue();
    +        }
    +        copyValue(output, tupleValue, isLeft);
    +        copyValue(output, joinedValue.getValue(), !isLeft);
    +        result.add(output);
    +        joinedValue.setMatch(true);
    +      }
    +      if (tuple != null) {
    +        tuple.setMatch(true);
    +      }
    +      if (result.size() != 0) {
    +        outputPort.emit(result);
    +      }
    +    }
    +  }
    +
    +  // Emit the unmatched tuples, if the strategy is outer join
    +  @Override
    +  public void endWindow()
    +  {
    +    if (strategy.equals(JoinStrategy.LEFT_OUTER_JOIN) || strategy.equals(JoinStrategy.OUTER_JOIN)) {
    +      join(null, false);
    +    }
    +    if (strategy.equals(JoinStrategy.RIGHT_OUTER_JOIN) || strategy.equals(JoinStrategy.OUTER_JOIN)) {
    +      join(null, true);
    +    }
    +    store[0].endWindow();
    +    store[1].endWindow();
    +  }
    +
    +  @Override
    +  public void checkpointed(long windowId)
    +  {
    +    store[0].checkpointed(windowId);
    +    store[1].checkpointed(windowId);
    +  }
    +
    +  @Override
    +  public void committed(long windowId)
    +  {
    +    store[0].committed(windowId);
    +    store[1].committed(windowId);
    +  }
    +
    +  /**
    +   * Convert the given tuple to event
    +   *
    +   * @param tuple Given tuple to convert into event
    +   * @return event
    +   */
    +  protected TimeEvent createEvent(Object tuple)
    +  {
    +    int idx = 0;
    +    if (!isLeft) {
    +      idx = 1;
    +    }
    +    if (timeFields != null) {
    +      return new TimeEventImpl(getKeyValue(keys[idx], tuple), (Long)getTime(timeFields[idx], tuple), tuple);
    +    } else {
    +      return new TimeEventImpl(getKeyValue(keys[idx], tuple), Calendar.getInstance().getTimeInMillis(), tuple);
    +    }
    +  }
    +
    +  public void populateKeyFields()
    +  {
    +    this.keys = keyFieldStr.split(",");
    +  }
    +
    +  public JoinStrategy getStrategy()
    +  {
    +    return strategy;
    +  }
    +
    +  public void setStrategy(JoinStrategy strategy)
    +  {
    +    this.strategy = strategy;
    +  }
    +
    +  public void setLeftStore(BackupStore lStore)
    +  {
    +    store[0] = lStore;
    +  }
    +
    +  public void setRightStore(BackupStore rStore)
    +  {
    +    store[1] = rStore;
    +  }
    +
    +  public void setKeyFields(String keyFieldStr)
    +  {
    +    this.keyFieldStr = keyFieldStr;
    --- End diff --
    
    Made these key, includeFields, timeFields  arrays as transient. So, I am not incorporating this review comment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: SPOI-4520 Implemented Inmemory...

Posted by chaithu14 <gi...@git.apache.org>.
Github user chaithu14 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/113#discussion_r46111198
  
    --- Diff: library/src/main/java/com/datatorrent/lib/join/AbstractJoinOperator.java ---
    @@ -0,0 +1,341 @@
    +/*
    + * Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved.
    + *
    + * Licensed under the Apache License, Version 2.0 (the "License");
    + * you may not use this file except in compliance with the License.
    + * You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package com.datatorrent.lib.join;
    +
    +import java.lang.reflect.Array;
    +import java.util.ArrayList;
    +import java.util.Calendar;
    +import java.util.List;
    +
    +import org.apache.hadoop.classification.InterfaceStability.Evolving;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DefaultInputPort;
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.api.Operator;
    +import com.datatorrent.api.annotation.InputPortFieldAnnotation;
    +import com.datatorrent.common.util.BaseOperator;
    +
    +/**
    + * <p>
    + * This is the base implementation of join operator. Operator receives tuples from multiple streams,
    + * applies the join operation based on constraint and emit the joined value.
    + * Subclasses should provide implementation to createOutputTuple,copyValue, getKeyValue, getTime methods.
    + *
    + * <b>Properties:</b><br>
    + * <b>expiryTime</b>: Expiry time for stored tuples<br>
    + * <b>includeFieldStr</b>: List of comma separated fields to be added to the output tuple.
    + *                         Ex: Field1,Field2;Field3,Field4<br>
    + * <b>keyFields</b>: List of comma separated key field for both the streams. Ex: Field1,Field2<br>
    + * <b>timeFields</b>: List of comma separated time field for both the streams. Ex: Field1,Field2<br>
    --- End diff --
    
    Join operator is time based. timeField specifies the field of tuple which has the time. 
    For example, One input port receives customer details and the other port receives Order details.
    Schema for the Customer be in the form of <ID, Name, RTime>
    Schema for the Order be in the form of <OID, OTime, CID, Amount>
    Now, Join the tuples of Customer and Order streams where Customer.ID = Order.CID and the constraint is matched tuples must have timestamp within 5 minutes. 
    Here, key Fields = ID, CID and Time Fields = RTime, OTime
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: SPOI-4520 Implemented Inmemory...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/113#discussion_r45860815
  
    --- Diff: library/src/main/java/com/datatorrent/lib/join/AbstractJoinOperator.java ---
    @@ -0,0 +1,341 @@
    +/*
    + * Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved.
    + *
    + * Licensed under the Apache License, Version 2.0 (the "License");
    + * you may not use this file except in compliance with the License.
    + * You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package com.datatorrent.lib.join;
    +
    +import java.lang.reflect.Array;
    +import java.util.ArrayList;
    +import java.util.Calendar;
    +import java.util.List;
    +
    +import org.apache.hadoop.classification.InterfaceStability.Evolving;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DefaultInputPort;
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.api.Operator;
    +import com.datatorrent.api.annotation.InputPortFieldAnnotation;
    +import com.datatorrent.common.util.BaseOperator;
    +
    +/**
    + * <p>
    + * This is the base implementation of join operator. Operator receives tuples from multiple streams,
    + * applies the join operation based on constraint and emit the joined value.
    + * Subclasses should provide implementation to createOutputTuple,copyValue, getKeyValue, getTime methods.
    + *
    + * <b>Properties:</b><br>
    + * <b>expiryTime</b>: Expiry time for stored tuples<br>
    + * <b>includeFieldStr</b>: List of comma separated fields to be added to the output tuple.
    + *                         Ex: Field1,Field2;Field3,Field4<br>
    + * <b>keyFields</b>: List of comma separated key field for both the streams. Ex: Field1,Field2<br>
    + * <b>timeFields</b>: List of comma separated time field for both the streams. Ex: Field1,Field2<br>
    + * <b>bucketSpanInMillis</b>: Span of each bucket in milliseconds.<br>
    + * <b>strategy</b>: Type of join operation. Default type is inner join<br>
    + * <br>
    + * </p>
    + *
    + * @displayName Abstract Join Operator
    + * @tags join
    + * @since 3.3.0
    + */
    +@Evolving
    +public abstract class AbstractJoinOperator<T> extends BaseOperator implements Operator.CheckpointListener
    +{
    +  public final transient DefaultOutputPort<List<T>> outputPort = new DefaultOutputPort<>();
    +  protected String[] timeFields;
    +
    +  protected String[][] includeFields;
    +  // Fields to compare from both the streams
    +  protected String[] keys;
    +  // Strategy of Join operation, by default the option is inner join
    +  protected JoinStrategy strategy = JoinStrategy.INNER_JOIN;
    +  // This represents whether the processing tuple is from left port or not
    +  protected Boolean isLeft;
    +  @InputPortFieldAnnotation(optional = true)
    +  public transient DefaultInputPort<T> input1 = new DefaultInputPort<T>()
    +  {
    +    @Override
    +    public void process(T tuple)
    +    {
    +      isLeft = true;
    +      processTuple(tuple);
    +    }
    +  };
    +  @InputPortFieldAnnotation(optional = true)
    +  public transient DefaultInputPort<T> input2 = new DefaultInputPort<T>()
    +  {
    +    @Override
    +    public void process(T tuple)
    +    {
    +      isLeft = false;
    +      processTuple(tuple);
    +    }
    +  };
    +  // Stores for each of the input port
    +  private BackupStore[] store = (BackupStore[])Array.newInstance(BackupStore.class, 2);
    +  private String includeFieldStr;
    +  private String keyFieldStr;
    +  private String timeFieldStr;
    +
    +  @Override
    +  public void setup(Context.OperatorContext context)
    +  {
    +    if (store[0] == null) {
    +      throw new RuntimeException("Left Store is Empty");
    --- End diff --
    
    when will store[0] and store[1] null??


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: SPOI-4520 Implemented Inmemory...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/113#discussion_r45863667
  
    --- Diff: library/src/main/java/com/datatorrent/lib/join/AbstractJoinOperator.java ---
    @@ -0,0 +1,341 @@
    +/*
    + * Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved.
    + *
    + * Licensed under the Apache License, Version 2.0 (the "License");
    + * you may not use this file except in compliance with the License.
    + * You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package com.datatorrent.lib.join;
    +
    +import java.lang.reflect.Array;
    +import java.util.ArrayList;
    +import java.util.Calendar;
    +import java.util.List;
    +
    +import org.apache.hadoop.classification.InterfaceStability.Evolving;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DefaultInputPort;
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.api.Operator;
    +import com.datatorrent.api.annotation.InputPortFieldAnnotation;
    +import com.datatorrent.common.util.BaseOperator;
    +
    +/**
    + * <p>
    + * This is the base implementation of join operator. Operator receives tuples from multiple streams,
    + * applies the join operation based on constraint and emit the joined value.
    + * Subclasses should provide implementation to createOutputTuple,copyValue, getKeyValue, getTime methods.
    + *
    + * <b>Properties:</b><br>
    + * <b>expiryTime</b>: Expiry time for stored tuples<br>
    + * <b>includeFieldStr</b>: List of comma separated fields to be added to the output tuple.
    + *                         Ex: Field1,Field2;Field3,Field4<br>
    + * <b>keyFields</b>: List of comma separated key field for both the streams. Ex: Field1,Field2<br>
    + * <b>timeFields</b>: List of comma separated time field for both the streams. Ex: Field1,Field2<br>
    + * <b>bucketSpanInMillis</b>: Span of each bucket in milliseconds.<br>
    + * <b>strategy</b>: Type of join operation. Default type is inner join<br>
    + * <br>
    + * </p>
    + *
    + * @displayName Abstract Join Operator
    + * @tags join
    + * @since 3.3.0
    + */
    +@Evolving
    +public abstract class AbstractJoinOperator<T> extends BaseOperator implements Operator.CheckpointListener
    +{
    +  public final transient DefaultOutputPort<List<T>> outputPort = new DefaultOutputPort<>();
    +  protected String[] timeFields;
    +
    +  protected String[][] includeFields;
    +  // Fields to compare from both the streams
    +  protected String[] keys;
    +  // Strategy of Join operation, by default the option is inner join
    +  protected JoinStrategy strategy = JoinStrategy.INNER_JOIN;
    +  // This represents whether the processing tuple is from left port or not
    +  protected Boolean isLeft;
    +  @InputPortFieldAnnotation(optional = true)
    +  public transient DefaultInputPort<T> input1 = new DefaultInputPort<T>()
    +  {
    +    @Override
    +    public void process(T tuple)
    +    {
    +      isLeft = true;
    +      processTuple(tuple);
    +    }
    +  };
    +  @InputPortFieldAnnotation(optional = true)
    +  public transient DefaultInputPort<T> input2 = new DefaultInputPort<T>()
    +  {
    +    @Override
    +    public void process(T tuple)
    +    {
    +      isLeft = false;
    +      processTuple(tuple);
    +    }
    +  };
    +  // Stores for each of the input port
    +  private BackupStore[] store = (BackupStore[])Array.newInstance(BackupStore.class, 2);
    +  private String includeFieldStr;
    +  private String keyFieldStr;
    +  private String timeFieldStr;
    +
    +  @Override
    +  public void setup(Context.OperatorContext context)
    +  {
    +    if (store[0] == null) {
    +      throw new RuntimeException("Left Store is Empty");
    +    }
    +    if (store[1] == null) {
    +      throw new RuntimeException("Right Store is Empty");
    +    }
    +    // Checks whether the strategy is outer join and set it to store
    +    boolean isOuter = strategy.equals(JoinStrategy.LEFT_OUTER_JOIN) || strategy.equals(JoinStrategy.OUTER_JOIN);
    +    store[0].isOuterJoin(isOuter);
    +    isOuter = strategy.equals(JoinStrategy.RIGHT_OUTER_JOIN) || strategy.equals(JoinStrategy.OUTER_JOIN);
    +    store[1].isOuterJoin(isOuter);
    +    // Setup the stores
    +    store[0].setup();
    +    store[1].setup();
    +
    +    populateFields();
    +  }
    +
    +  /**
    +   * Create the event with the given tuple. If it successfully inserted it into the store
    +   * then it does the join operation
    +   *
    +   * @param tuple Tuple to process
    +   */
    +  protected void processTuple(T tuple)
    +  {
    +    int idx = 0;
    +    if (!isLeft) {
    +      idx = 1;
    +    }
    +    TimeEvent t = createEvent(tuple);
    +    if (store[idx].put(t)) {
    +      join(t, isLeft);
    +    }
    +  }
    +
    +  private void populateFields()
    +  {
    +    populateIncludeFields();
    +    populateKeyFields();
    +    if (timeFieldStr != null) {
    +      populateTimeFields();
    +    }
    +  }
    +
    +  /**
    +   * Populate the fields from the includeFiledStr
    +   */
    +  private void populateIncludeFields()
    +  {
    +    includeFields = new String[2][];
    +    String[] portFields = includeFieldStr.split(";");
    +    for (int i = 0; i < portFields.length; i++) {
    +      includeFields[i] = portFields[i].split(",");
    +    }
    +  }
    +
    +  /**
    +   * Get the tuples from another store based on join constraint and key
    +   *
    +   * @param tuple  input
    +   * @param isLeft whether the given tuple is from first port or not
    +   */
    +  private void join(TimeEvent tuple, Boolean isLeft)
    +  {
    +    // Get the valid tuples from the store based on key
    +    // If the tuple is null means the join type is outer and return unmatched tuples from store.
    +    Object value;
    --- End diff --
    
    if value is of type ArrayList<TimeEvent> joinedValues why declare as Object type?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: SPOI-4520 Implemented Inmemory...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/113#discussion_r45945517
  
    --- Diff: library/src/main/java/com/datatorrent/lib/join/AbstractJoinOperator.java ---
    @@ -0,0 +1,343 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.join;
    +
    +import java.lang.reflect.Array;
    +import java.util.ArrayList;
    +import java.util.Calendar;
    +import java.util.List;
    +
    +import org.apache.hadoop.classification.InterfaceStability.Evolving;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DefaultInputPort;
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.api.Operator;
    +import com.datatorrent.api.annotation.InputPortFieldAnnotation;
    +import com.datatorrent.common.util.BaseOperator;
    +
    +/**
    + * <p>
    + * This is the base implementation of join operator. Operator receives tuples from multiple streams,
    + * applies the join operation based on constraint and emit the joined value.
    + * Subclasses should provide implementation to createOutputTuple,copyValue, getKeyValue, getTime methods.
    + *
    + * <b>Properties:</b><br>
    + * <b>expiryTime</b>: Expiry time for stored tuples<br>
    + * <b>includeFieldStr</b>: List of comma separated fields to be added to the output tuple.
    + *                         Ex: Field1,Field2;Field3,Field4<br>
    + * <b>keyFields</b>: List of comma separated key field for both the streams. Ex: Field1,Field2<br>
    + * <b>timeFields</b>: List of comma separated time field for both the streams. Ex: Field1,Field2<br>
    + * <b>bucketSpanInMillis</b>: Span of each bucket in milliseconds.<br>
    + * <b>strategy</b>: Type of join operation. Default type is inner join<br>
    + * <br>
    + * </p>
    + *
    + * @displayName Abstract Join Operator
    + * @tags join
    + */
    +@Evolving
    +public abstract class AbstractJoinOperator<T> extends BaseOperator implements Operator.CheckpointListener
    +{
    +  public final transient DefaultOutputPort<List<T>> outputPort = new DefaultOutputPort<>();
    +  protected String[] timeFields;
    +
    +  protected String[][] includeFields;
    +  // Fields to compare from both the streams
    +  protected String[] keys;
    +  // Strategy of Join operation, by default the option is inner join
    +  protected JoinStrategy strategy = JoinStrategy.INNER_JOIN;
    +  // This represents whether the processing tuple is from left port or not
    +  protected Boolean isLeft;
    +  @InputPortFieldAnnotation(optional = true)
    +  public transient DefaultInputPort<T> input1 = new DefaultInputPort<T>()
    +  {
    +    @Override
    +    public void process(T tuple)
    +    {
    +      isLeft = true;
    +      processTuple(tuple);
    +    }
    +  };
    +  @InputPortFieldAnnotation(optional = true)
    +  public transient DefaultInputPort<T> input2 = new DefaultInputPort<T>()
    +  {
    +    @Override
    +    public void process(T tuple)
    +    {
    +      isLeft = false;
    +      processTuple(tuple);
    +    }
    +  };
    +  // Stores for each of the input port
    +  private BackupStore[] store = (BackupStore[])Array.newInstance(BackupStore.class, 2);
    +  private String includeFieldStr;
    +  private String keyFieldStr;
    +  private String timeFieldStr;
    +
    +  @Override
    +  public void setup(Context.OperatorContext context)
    +  {
    +    if (store[0] == null) {
    +      throw new RuntimeException("Left Store is Empty");
    +    }
    +    if (store[1] == null) {
    +      throw new RuntimeException("Right Store is Empty");
    +    }
    +    // Checks whether the strategy is outer join and set it to store
    +    boolean isOuter = strategy.equals(JoinStrategy.LEFT_OUTER_JOIN) || strategy.equals(JoinStrategy.OUTER_JOIN);
    +    store[0].isOuterJoin(isOuter);
    +    isOuter = strategy.equals(JoinStrategy.RIGHT_OUTER_JOIN) || strategy.equals(JoinStrategy.OUTER_JOIN);
    +    store[1].isOuterJoin(isOuter);
    +    // Setup the stores
    +    store[0].setup();
    +    store[1].setup();
    +
    +    populateFields();
    --- End diff --
    
    This is called every time setup is called then why are timeFields[], keys[], includeFields[] defined as non-transient


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: SPOI-4520 Implemented Inmemory...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/113#discussion_r45944437
  
    --- Diff: library/src/main/java/com/datatorrent/lib/join/BackupStore.java ---
    @@ -0,0 +1,76 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.join;
    +
    +import org.apache.hadoop.classification.InterfaceStability.Evolving;
    +
    +/**
    + * <p>
    + * Interface of store for join operation.
    + * </p>
    + */
    +@Evolving
    +public interface BackupStore
    +{
    +  /**
    +   * Generate the store
    +   */
    +  void setup();
    +
    +  void committed(long windowId);
    +
    +  void checkpointed(long windowId);
    +
    +  void endWindow();
    +
    +  /**
    +   * Get the key from the given tuple and with that key, get the tuples which satisfies the join constraint
    +   * from the store.
    +   *
    +   * @param tuple Given tuple
    +   * @return the valid tuples which statisfies the join constraint
    +   */
    +  Object getValidTuples(Object tuple);
    +
    +  /**
    +   * Insert the given tuple
    +   *
    +   * @param tuple Given tuple
    +   */
    +  Boolean put(Object tuple);
    +
    +  /**
    +   * Shutdown the services.
    +   */
    +  void shutdown();
    +
    +  /**
    +   * Return the unmatched events from store
    +   *
    +   * @return the unmatched events
    +   */
    +  Object getUnMatchedTuples();
    --- End diff --
    
    Can this be list?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1720 Implemented Inmemory...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/113#discussion_r46249177
  
    --- Diff: library/src/main/java/com/datatorrent/lib/join/TimeBasedStore.java ---
    @@ -0,0 +1,333 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.join;
    +
    +import java.lang.reflect.Array;
    +import java.util.ArrayList;
    +import java.util.Calendar;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.Timer;
    +import java.util.TimerTask;
    +import java.util.concurrent.ConcurrentHashMap;
    +
    +import javax.validation.constraints.Min;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import org.apache.hadoop.classification.InterfaceStability;
    +
    +/**
    + * Base implementation of time based store for key-value pair tuples.
    + *
    + * @param <T>
    + */
    +@InterfaceStability.Evolving
    +public class TimeBasedStore<T extends TimeEvent>
    +{
    --- End diff --
    
    I am not sure if that is happening


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: SPOI-4520 Implemented Inmemory...

Posted by chaithu14 <gi...@git.apache.org>.
Github user chaithu14 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/113#discussion_r45945558
  
    --- Diff: library/src/main/java/com/datatorrent/lib/join/AbstractJoinOperator.java ---
    @@ -0,0 +1,343 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.join;
    +
    +import java.lang.reflect.Array;
    +import java.util.ArrayList;
    +import java.util.Calendar;
    +import java.util.List;
    +
    +import org.apache.hadoop.classification.InterfaceStability.Evolving;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DefaultInputPort;
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.api.Operator;
    +import com.datatorrent.api.annotation.InputPortFieldAnnotation;
    +import com.datatorrent.common.util.BaseOperator;
    +
    +/**
    + * <p>
    + * This is the base implementation of join operator. Operator receives tuples from multiple streams,
    + * applies the join operation based on constraint and emit the joined value.
    + * Subclasses should provide implementation to createOutputTuple,copyValue, getKeyValue, getTime methods.
    + *
    + * <b>Properties:</b><br>
    + * <b>expiryTime</b>: Expiry time for stored tuples<br>
    + * <b>includeFieldStr</b>: List of comma separated fields to be added to the output tuple.
    + *                         Ex: Field1,Field2;Field3,Field4<br>
    + * <b>keyFields</b>: List of comma separated key field for both the streams. Ex: Field1,Field2<br>
    + * <b>timeFields</b>: List of comma separated time field for both the streams. Ex: Field1,Field2<br>
    + * <b>bucketSpanInMillis</b>: Span of each bucket in milliseconds.<br>
    + * <b>strategy</b>: Type of join operation. Default type is inner join<br>
    + * <br>
    + * </p>
    + *
    + * @displayName Abstract Join Operator
    + * @tags join
    + */
    +@Evolving
    +public abstract class AbstractJoinOperator<T> extends BaseOperator implements Operator.CheckpointListener
    +{
    +  public final transient DefaultOutputPort<List<T>> outputPort = new DefaultOutputPort<>();
    +  protected String[] timeFields;
    +
    +  protected String[][] includeFields;
    +  // Fields to compare from both the streams
    +  protected String[] keys;
    +  // Strategy of Join operation, by default the option is inner join
    +  protected JoinStrategy strategy = JoinStrategy.INNER_JOIN;
    +  // This represents whether the processing tuple is from left port or not
    +  protected Boolean isLeft;
    +  @InputPortFieldAnnotation(optional = true)
    +  public transient DefaultInputPort<T> input1 = new DefaultInputPort<T>()
    +  {
    +    @Override
    +    public void process(T tuple)
    +    {
    +      isLeft = true;
    +      processTuple(tuple);
    +    }
    +  };
    +  @InputPortFieldAnnotation(optional = true)
    +  public transient DefaultInputPort<T> input2 = new DefaultInputPort<T>()
    +  {
    +    @Override
    +    public void process(T tuple)
    +    {
    +      isLeft = false;
    +      processTuple(tuple);
    +    }
    +  };
    +  // Stores for each of the input port
    +  private BackupStore[] store = (BackupStore[])Array.newInstance(BackupStore.class, 2);
    +  private String includeFieldStr;
    +  private String keyFieldStr;
    +  private String timeFieldStr;
    +
    +  @Override
    +  public void setup(Context.OperatorContext context)
    +  {
    +    if (store[0] == null) {
    +      throw new RuntimeException("Left Store is Empty");
    +    }
    +    if (store[1] == null) {
    +      throw new RuntimeException("Right Store is Empty");
    +    }
    +    // Checks whether the strategy is outer join and set it to store
    +    boolean isOuter = strategy.equals(JoinStrategy.LEFT_OUTER_JOIN) || strategy.equals(JoinStrategy.OUTER_JOIN);
    +    store[0].isOuterJoin(isOuter);
    +    isOuter = strategy.equals(JoinStrategy.RIGHT_OUTER_JOIN) || strategy.equals(JoinStrategy.OUTER_JOIN);
    +    store[1].isOuterJoin(isOuter);
    +    // Setup the stores
    +    store[0].setup();
    +    store[1].setup();
    +
    +    populateFields();
    +  }
    +
    +  /**
    +   * Create the event with the given tuple. If it successfully inserted it into the store
    +   * then it does the join operation
    +   *
    +   * @param tuple Tuple to process
    +   */
    +  protected void processTuple(T tuple)
    +  {
    +    int idx = 0;
    +    if (!isLeft) {
    +      idx = 1;
    +    }
    +    TimeEvent t = createEvent(tuple);
    +    if (store[idx].put(t)) {
    +      join(t, isLeft);
    +    }
    +  }
    +
    +  private void populateFields()
    +  {
    +    populateIncludeFields();
    +    populateKeyFields();
    +    if (timeFieldStr != null) {
    +      populateTimeFields();
    +    }
    +  }
    +
    +  /**
    +   * Populate the fields from the includeFiledStr
    +   */
    +  private void populateIncludeFields()
    +  {
    +    includeFields = new String[2][];
    +    String[] portFields = includeFieldStr.split(";");
    +    for (int i = 0; i < portFields.length; i++) {
    +      includeFields[i] = portFields[i].split(",");
    +    }
    +  }
    +
    +  /**
    +   * Get the tuples from another store based on join constraint and key
    +   *
    +   * @param tuple  input
    +   * @param isLeft whether the given tuple is from first port or not
    +   */
    +  private void join(TimeEvent tuple, Boolean isLeft)
    +  {
    +    // Get the valid tuples from the store based on key
    +    // If the tuple is null means the join type is outer and return unmatched tuples from store.
    +    Object value;
    +    if (isLeft) {
    +      if (tuple != null) {
    +        value = store[1].getValidTuples(tuple);
    +      } else {
    +        value = store[1].getUnMatchedTuples();
    +      }
    +    } else {
    +      if (tuple != null) {
    +        value = store[0].getValidTuples(tuple);
    +      } else {
    +        value = store[0].getUnMatchedTuples();
    +      }
    +    }
    +    // Join the input tuple with the joined tuples
    +    if (value != null) {
    +      ArrayList<TimeEvent> joinedValues = (ArrayList<TimeEvent>)value;
    +      List<T> result = new ArrayList<>();
    +      for (TimeEvent joinedValue : joinedValues) {
    +        T output = createOutputTuple();
    +        Object tupleValue = null;
    +        if (tuple != null) {
    +          tupleValue = tuple.getValue();
    +        }
    +        copyValue(output, tupleValue, isLeft);
    +        copyValue(output, joinedValue.getValue(), !isLeft);
    +        result.add(output);
    +        joinedValue.setMatch(true);
    +      }
    +      if (tuple != null) {
    +        tuple.setMatch(true);
    +      }
    +      if (result.size() != 0) {
    +        outputPort.emit(result);
    +      }
    +    }
    +  }
    +
    +  // Emit the unmatched tuples, if the strategy is outer join
    +  @Override
    --- End diff --
    
    Here the case is, if the strategy is not INNER_JOIN. In this case, the tuples which are not applied the join operation in expiry time period, will apply the join operation after expiry .


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: SPOI-4520 Implemented Inmemory...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/113#discussion_r45945847
  
    --- Diff: library/src/main/java/com/datatorrent/lib/join/AbstractJoinOperator.java ---
    @@ -0,0 +1,343 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.join;
    +
    +import java.lang.reflect.Array;
    +import java.util.ArrayList;
    +import java.util.Calendar;
    +import java.util.List;
    +
    +import org.apache.hadoop.classification.InterfaceStability.Evolving;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DefaultInputPort;
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.api.Operator;
    +import com.datatorrent.api.annotation.InputPortFieldAnnotation;
    +import com.datatorrent.common.util.BaseOperator;
    +
    +/**
    + * <p>
    + * This is the base implementation of join operator. Operator receives tuples from multiple streams,
    + * applies the join operation based on constraint and emit the joined value.
    + * Subclasses should provide implementation to createOutputTuple,copyValue, getKeyValue, getTime methods.
    + *
    + * <b>Properties:</b><br>
    + * <b>expiryTime</b>: Expiry time for stored tuples<br>
    + * <b>includeFieldStr</b>: List of comma separated fields to be added to the output tuple.
    + *                         Ex: Field1,Field2;Field3,Field4<br>
    + * <b>keyFields</b>: List of comma separated key field for both the streams. Ex: Field1,Field2<br>
    + * <b>timeFields</b>: List of comma separated time field for both the streams. Ex: Field1,Field2<br>
    + * <b>bucketSpanInMillis</b>: Span of each bucket in milliseconds.<br>
    + * <b>strategy</b>: Type of join operation. Default type is inner join<br>
    + * <br>
    + * </p>
    + *
    + * @displayName Abstract Join Operator
    + * @tags join
    + */
    +@Evolving
    +public abstract class AbstractJoinOperator<T> extends BaseOperator implements Operator.CheckpointListener
    +{
    +  public final transient DefaultOutputPort<List<T>> outputPort = new DefaultOutputPort<>();
    +  protected String[] timeFields;
    +
    +  protected String[][] includeFields;
    +  // Fields to compare from both the streams
    +  protected String[] keys;
    +  // Strategy of Join operation, by default the option is inner join
    +  protected JoinStrategy strategy = JoinStrategy.INNER_JOIN;
    +  // This represents whether the processing tuple is from left port or not
    +  protected Boolean isLeft;
    +  @InputPortFieldAnnotation(optional = true)
    +  public transient DefaultInputPort<T> input1 = new DefaultInputPort<T>()
    +  {
    +    @Override
    +    public void process(T tuple)
    +    {
    +      isLeft = true;
    +      processTuple(tuple);
    +    }
    +  };
    +  @InputPortFieldAnnotation(optional = true)
    +  public transient DefaultInputPort<T> input2 = new DefaultInputPort<T>()
    +  {
    +    @Override
    +    public void process(T tuple)
    +    {
    +      isLeft = false;
    +      processTuple(tuple);
    +    }
    +  };
    +  // Stores for each of the input port
    +  private BackupStore[] store = (BackupStore[])Array.newInstance(BackupStore.class, 2);
    +  private String includeFieldStr;
    +  private String keyFieldStr;
    +  private String timeFieldStr;
    +
    +  @Override
    +  public void setup(Context.OperatorContext context)
    +  {
    +    if (store[0] == null) {
    +      throw new RuntimeException("Left Store is Empty");
    +    }
    +    if (store[1] == null) {
    +      throw new RuntimeException("Right Store is Empty");
    +    }
    +    // Checks whether the strategy is outer join and set it to store
    +    boolean isOuter = strategy.equals(JoinStrategy.LEFT_OUTER_JOIN) || strategy.equals(JoinStrategy.OUTER_JOIN);
    +    store[0].isOuterJoin(isOuter);
    +    isOuter = strategy.equals(JoinStrategy.RIGHT_OUTER_JOIN) || strategy.equals(JoinStrategy.OUTER_JOIN);
    +    store[1].isOuterJoin(isOuter);
    +    // Setup the stores
    +    store[0].setup();
    +    store[1].setup();
    +
    +    populateFields();
    +  }
    +
    +  /**
    +   * Create the event with the given tuple. If it successfully inserted it into the store
    +   * then it does the join operation
    +   *
    +   * @param tuple Tuple to process
    +   */
    +  protected void processTuple(T tuple)
    +  {
    +    int idx = 0;
    +    if (!isLeft) {
    +      idx = 1;
    +    }
    +    TimeEvent t = createEvent(tuple);
    +    if (store[idx].put(t)) {
    +      join(t, isLeft);
    +    }
    +  }
    +
    +  private void populateFields()
    +  {
    +    populateIncludeFields();
    +    populateKeyFields();
    +    if (timeFieldStr != null) {
    +      populateTimeFields();
    +    }
    +  }
    +
    +  /**
    +   * Populate the fields from the includeFiledStr
    +   */
    +  private void populateIncludeFields()
    +  {
    +    includeFields = new String[2][];
    +    String[] portFields = includeFieldStr.split(";");
    +    for (int i = 0; i < portFields.length; i++) {
    +      includeFields[i] = portFields[i].split(",");
    +    }
    +  }
    +
    +  /**
    +   * Get the tuples from another store based on join constraint and key
    +   *
    +   * @param tuple  input
    +   * @param isLeft whether the given tuple is from first port or not
    +   */
    +  private void join(TimeEvent tuple, Boolean isLeft)
    +  {
    +    // Get the valid tuples from the store based on key
    +    // If the tuple is null means the join type is outer and return unmatched tuples from store.
    +    Object value;
    +    if (isLeft) {
    +      if (tuple != null) {
    +        value = store[1].getValidTuples(tuple);
    +      } else {
    +        value = store[1].getUnMatchedTuples();
    +      }
    +    } else {
    +      if (tuple != null) {
    +        value = store[0].getValidTuples(tuple);
    +      } else {
    +        value = store[0].getUnMatchedTuples();
    +      }
    +    }
    +    // Join the input tuple with the joined tuples
    +    if (value != null) {
    +      ArrayList<TimeEvent> joinedValues = (ArrayList<TimeEvent>)value;
    +      List<T> result = new ArrayList<>();
    +      for (TimeEvent joinedValue : joinedValues) {
    +        T output = createOutputTuple();
    +        Object tupleValue = null;
    +        if (tuple != null) {
    +          tupleValue = tuple.getValue();
    +        }
    +        copyValue(output, tupleValue, isLeft);
    +        copyValue(output, joinedValue.getValue(), !isLeft);
    +        result.add(output);
    +        joinedValue.setMatch(true);
    +      }
    +      if (tuple != null) {
    +        tuple.setMatch(true);
    +      }
    +      if (result.size() != 0) {
    +        outputPort.emit(result);
    +      }
    +    }
    +  }
    +
    +  // Emit the unmatched tuples, if the strategy is outer join
    +  @Override
    +  public void endWindow()
    +  {
    +    if (strategy.equals(JoinStrategy.LEFT_OUTER_JOIN) || strategy.equals(JoinStrategy.OUTER_JOIN)) {
    +      join(null, false);
    +    }
    +    if (strategy.equals(JoinStrategy.RIGHT_OUTER_JOIN) || strategy.equals(JoinStrategy.OUTER_JOIN)) {
    +      join(null, true);
    +    }
    +    store[0].endWindow();
    +    store[1].endWindow();
    +  }
    +
    +  @Override
    +  public void checkpointed(long windowId)
    +  {
    +    store[0].checkpointed(windowId);
    +    store[1].checkpointed(windowId);
    +  }
    +
    +  @Override
    +  public void committed(long windowId)
    +  {
    +    store[0].committed(windowId);
    +    store[1].committed(windowId);
    +  }
    +
    +  /**
    +   * Convert the given tuple to event
    +   *
    +   * @param tuple Given tuple to convert into event
    +   * @return event
    +   */
    +  protected TimeEvent createEvent(Object tuple)
    +  {
    +    int idx = 0;
    +    if (!isLeft) {
    +      idx = 1;
    +    }
    +    if (timeFields != null) {
    +      return new TimeEventImpl(getKeyValue(keys[idx], tuple), (Long)getTime(timeFields[idx], tuple), tuple);
    +    } else {
    +      return new TimeEventImpl(getKeyValue(keys[idx], tuple), Calendar.getInstance().getTimeInMillis(), tuple);
    +    }
    +  }
    +
    +  public void populateKeyFields()
    +  {
    --- End diff --
    
    why is this public? only usage i see is in populateFields()


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: SPOI-4520 Implemented Inmemory...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/113#discussion_r45945908
  
    --- Diff: library/src/main/java/com/datatorrent/lib/join/AbstractJoinOperator.java ---
    @@ -0,0 +1,343 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.join;
    +
    +import java.lang.reflect.Array;
    +import java.util.ArrayList;
    +import java.util.Calendar;
    +import java.util.List;
    +
    +import org.apache.hadoop.classification.InterfaceStability.Evolving;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DefaultInputPort;
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.api.Operator;
    +import com.datatorrent.api.annotation.InputPortFieldAnnotation;
    +import com.datatorrent.common.util.BaseOperator;
    +
    +/**
    + * <p>
    + * This is the base implementation of join operator. Operator receives tuples from multiple streams,
    + * applies the join operation based on constraint and emit the joined value.
    + * Subclasses should provide implementation to createOutputTuple,copyValue, getKeyValue, getTime methods.
    + *
    + * <b>Properties:</b><br>
    + * <b>expiryTime</b>: Expiry time for stored tuples<br>
    + * <b>includeFieldStr</b>: List of comma separated fields to be added to the output tuple.
    + *                         Ex: Field1,Field2;Field3,Field4<br>
    + * <b>keyFields</b>: List of comma separated key field for both the streams. Ex: Field1,Field2<br>
    + * <b>timeFields</b>: List of comma separated time field for both the streams. Ex: Field1,Field2<br>
    + * <b>bucketSpanInMillis</b>: Span of each bucket in milliseconds.<br>
    + * <b>strategy</b>: Type of join operation. Default type is inner join<br>
    + * <br>
    + * </p>
    + *
    + * @displayName Abstract Join Operator
    + * @tags join
    + */
    +@Evolving
    +public abstract class AbstractJoinOperator<T> extends BaseOperator implements Operator.CheckpointListener
    +{
    +  public final transient DefaultOutputPort<List<T>> outputPort = new DefaultOutputPort<>();
    +  protected String[] timeFields;
    +
    +  protected String[][] includeFields;
    +  // Fields to compare from both the streams
    +  protected String[] keys;
    +  // Strategy of Join operation, by default the option is inner join
    +  protected JoinStrategy strategy = JoinStrategy.INNER_JOIN;
    +  // This represents whether the processing tuple is from left port or not
    +  protected Boolean isLeft;
    +  @InputPortFieldAnnotation(optional = true)
    +  public transient DefaultInputPort<T> input1 = new DefaultInputPort<T>()
    +  {
    +    @Override
    +    public void process(T tuple)
    +    {
    +      isLeft = true;
    +      processTuple(tuple);
    +    }
    +  };
    +  @InputPortFieldAnnotation(optional = true)
    +  public transient DefaultInputPort<T> input2 = new DefaultInputPort<T>()
    +  {
    +    @Override
    +    public void process(T tuple)
    +    {
    +      isLeft = false;
    +      processTuple(tuple);
    +    }
    +  };
    +  // Stores for each of the input port
    +  private BackupStore[] store = (BackupStore[])Array.newInstance(BackupStore.class, 2);
    +  private String includeFieldStr;
    +  private String keyFieldStr;
    +  private String timeFieldStr;
    +
    +  @Override
    +  public void setup(Context.OperatorContext context)
    +  {
    +    if (store[0] == null) {
    +      throw new RuntimeException("Left Store is Empty");
    +    }
    +    if (store[1] == null) {
    +      throw new RuntimeException("Right Store is Empty");
    +    }
    +    // Checks whether the strategy is outer join and set it to store
    +    boolean isOuter = strategy.equals(JoinStrategy.LEFT_OUTER_JOIN) || strategy.equals(JoinStrategy.OUTER_JOIN);
    +    store[0].isOuterJoin(isOuter);
    +    isOuter = strategy.equals(JoinStrategy.RIGHT_OUTER_JOIN) || strategy.equals(JoinStrategy.OUTER_JOIN);
    +    store[1].isOuterJoin(isOuter);
    +    // Setup the stores
    +    store[0].setup();
    +    store[1].setup();
    +
    +    populateFields();
    +  }
    +
    +  /**
    +   * Create the event with the given tuple. If it successfully inserted it into the store
    +   * then it does the join operation
    +   *
    +   * @param tuple Tuple to process
    +   */
    +  protected void processTuple(T tuple)
    +  {
    +    int idx = 0;
    +    if (!isLeft) {
    +      idx = 1;
    +    }
    +    TimeEvent t = createEvent(tuple);
    +    if (store[idx].put(t)) {
    +      join(t, isLeft);
    +    }
    +  }
    +
    +  private void populateFields()
    +  {
    +    populateIncludeFields();
    +    populateKeyFields();
    +    if (timeFieldStr != null) {
    +      populateTimeFields();
    +    }
    +  }
    +
    +  /**
    +   * Populate the fields from the includeFiledStr
    +   */
    +  private void populateIncludeFields()
    +  {
    +    includeFields = new String[2][];
    +    String[] portFields = includeFieldStr.split(";");
    +    for (int i = 0; i < portFields.length; i++) {
    +      includeFields[i] = portFields[i].split(",");
    +    }
    +  }
    +
    +  /**
    +   * Get the tuples from another store based on join constraint and key
    +   *
    +   * @param tuple  input
    +   * @param isLeft whether the given tuple is from first port or not
    +   */
    +  private void join(TimeEvent tuple, Boolean isLeft)
    +  {
    +    // Get the valid tuples from the store based on key
    +    // If the tuple is null means the join type is outer and return unmatched tuples from store.
    +    Object value;
    +    if (isLeft) {
    +      if (tuple != null) {
    +        value = store[1].getValidTuples(tuple);
    +      } else {
    +        value = store[1].getUnMatchedTuples();
    +      }
    +    } else {
    +      if (tuple != null) {
    +        value = store[0].getValidTuples(tuple);
    +      } else {
    +        value = store[0].getUnMatchedTuples();
    +      }
    +    }
    +    // Join the input tuple with the joined tuples
    +    if (value != null) {
    +      ArrayList<TimeEvent> joinedValues = (ArrayList<TimeEvent>)value;
    +      List<T> result = new ArrayList<>();
    +      for (TimeEvent joinedValue : joinedValues) {
    +        T output = createOutputTuple();
    +        Object tupleValue = null;
    +        if (tuple != null) {
    +          tupleValue = tuple.getValue();
    +        }
    +        copyValue(output, tupleValue, isLeft);
    +        copyValue(output, joinedValue.getValue(), !isLeft);
    +        result.add(output);
    +        joinedValue.setMatch(true);
    +      }
    +      if (tuple != null) {
    +        tuple.setMatch(true);
    +      }
    +      if (result.size() != 0) {
    +        outputPort.emit(result);
    +      }
    +    }
    +  }
    +
    +  // Emit the unmatched tuples, if the strategy is outer join
    +  @Override
    +  public void endWindow()
    +  {
    +    if (strategy.equals(JoinStrategy.LEFT_OUTER_JOIN) || strategy.equals(JoinStrategy.OUTER_JOIN)) {
    +      join(null, false);
    +    }
    +    if (strategy.equals(JoinStrategy.RIGHT_OUTER_JOIN) || strategy.equals(JoinStrategy.OUTER_JOIN)) {
    +      join(null, true);
    +    }
    +    store[0].endWindow();
    +    store[1].endWindow();
    +  }
    +
    +  @Override
    +  public void checkpointed(long windowId)
    +  {
    +    store[0].checkpointed(windowId);
    +    store[1].checkpointed(windowId);
    +  }
    +
    +  @Override
    +  public void committed(long windowId)
    +  {
    +    store[0].committed(windowId);
    +    store[1].committed(windowId);
    +  }
    +
    +  /**
    +   * Convert the given tuple to event
    +   *
    +   * @param tuple Given tuple to convert into event
    +   * @return event
    +   */
    +  protected TimeEvent createEvent(Object tuple)
    +  {
    +    int idx = 0;
    +    if (!isLeft) {
    +      idx = 1;
    +    }
    +    if (timeFields != null) {
    +      return new TimeEventImpl(getKeyValue(keys[idx], tuple), (Long)getTime(timeFields[idx], tuple), tuple);
    +    } else {
    +      return new TimeEventImpl(getKeyValue(keys[idx], tuple), Calendar.getInstance().getTimeInMillis(), tuple);
    +    }
    +  }
    +
    +  public void populateKeyFields()
    +  {
    +    this.keys = keyFieldStr.split(",");
    +  }
    +
    +  public JoinStrategy getStrategy()
    +  {
    +    return strategy;
    +  }
    +
    +  public void setStrategy(JoinStrategy strategy)
    +  {
    +    this.strategy = strategy;
    +  }
    +
    +  public void setLeftStore(BackupStore lStore)
    +  {
    +    store[0] = lStore;
    +  }
    +
    +  public void setRightStore(BackupStore rStore)
    +  {
    +    store[1] = rStore;
    +  }
    +
    +  public void setKeyFields(String keyFieldStr)
    +  {
    +    this.keyFieldStr = keyFieldStr;
    --- End diff --
    
    Should populateKeys() not be called here instead of setup and similarly for other fields?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1720 Implemented Inmemory...

Posted by chandnisingh <gi...@git.apache.org>.
Github user chandnisingh commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/113#discussion_r46722810
  
    --- Diff: library/src/main/java/com/datatorrent/lib/join/AbstractJoinOperator.java ---
    @@ -0,0 +1,349 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.join;
    +
    +import java.util.ArrayList;
    +import java.util.Calendar;
    +import java.util.List;
    +
    +import javax.validation.constraints.NotNull;
    +import javax.validation.constraints.Size;
    +
    +import org.apache.hadoop.classification.InterfaceStability.Evolving;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DefaultInputPort;
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.api.Operator;
    +import com.datatorrent.api.annotation.InputPortFieldAnnotation;
    +import com.datatorrent.common.util.BaseOperator;
    +
    +/**
    + * <p>
    + * This is the base implementation of join operator. Operator receives tuples from two streams,
    + * applies the join operation based on constraint and emit the joined value.
    + * Subclasses should provide implementation to createOutputTuple,copyValue, getKeyValue, getTime methods.
    + *
    + * <b>Properties:</b><br>
    + * <b>expiryTime</b>: Expiry time for stored tuples<br>
    + * <b>includeFieldStr</b>: List of comma separated fields to be added to the output tuple.
    + *                         Ex: Field1,Field2;Field3,Field4<br>
    + * <b>keyFields</b>: List of comma separated key field for both the streams. Ex: Field1,Field2<br>
    + * <b>timeFields</b>: List of comma separated time field for both the streams. Ex: Field1,Field2<br>
    + * <b>bucketSpanInMillis</b>: Span of each bucket in milliseconds.<br>
    + * <b>strategy</b>: Type of join operation. Default type is inner join<br>
    + * <br>
    + *
    + * <b> Example: </b> <br>
    + *  Left input port receives customer details and right input port receives Order details.
    + *  Schema for the Customer be in the form of
    + *  Schema for the Order be in the form of
    + *  Now, Join the tuples of Customer and Order streams where Customer.ID = Order.CID and the constraint is
    + *  matched tuples must have timestamp within 5 minutes.
    + *  Here, key Fields = ID, CID and Time Fields = RTime, OTime, expiryTime = 5 minutes </b> <br>
    + *
    + *
    + * @displayName Abstract Join Operator
    + * @tags join
    + */
    +@Evolving
    +public abstract class AbstractJoinOperator<T> extends BaseOperator implements Operator.CheckpointListener
    +{
    +  public final transient DefaultOutputPort<List<T>> outputPort = new DefaultOutputPort<>();
    +
    +  @Size(min = 2, max = 2)
    +  protected transient String[] timeFields;
    --- End diff --
    
    @chaithu14 
    I think it is better to have a class, let's say,  StoreContext that encompasses  { timeField, keyField, includeFields, Store } .
    Then you can have 2 fields leftStoreContext and rightStoreContext. It makes the code more intuitive and easy to understand.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1720 Implemented Inmemory...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/incubator-apex-malhar/pull/113


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: SPOI-4520 Implemented Inmemory...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/113#discussion_r45945126
  
    --- Diff: library/src/main/java/com/datatorrent/lib/join/BackupStore.java ---
    @@ -0,0 +1,76 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.join;
    +
    +import org.apache.hadoop.classification.InterfaceStability.Evolving;
    +
    +/**
    + * <p>
    + * Interface of store for join operation.
    + * </p>
    + */
    +@Evolving
    +public interface BackupStore
    +{
    +  /**
    +   * Generate the store
    +   */
    +  void setup();
    +
    +  void committed(long windowId);
    --- End diff --
    
    Can you add java doc about these calls?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: SPOI-4520 Implemented Inmemory...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/113#discussion_r45861810
  
    --- Diff: library/src/main/java/com/datatorrent/lib/join/AbstractJoinOperator.java ---
    @@ -0,0 +1,341 @@
    +/*
    + * Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved.
    + *
    + * Licensed under the Apache License, Version 2.0 (the "License");
    + * you may not use this file except in compliance with the License.
    + * You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package com.datatorrent.lib.join;
    +
    +import java.lang.reflect.Array;
    +import java.util.ArrayList;
    +import java.util.Calendar;
    +import java.util.List;
    +
    +import org.apache.hadoop.classification.InterfaceStability.Evolving;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DefaultInputPort;
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.api.Operator;
    +import com.datatorrent.api.annotation.InputPortFieldAnnotation;
    +import com.datatorrent.common.util.BaseOperator;
    +
    +/**
    + * <p>
    + * This is the base implementation of join operator. Operator receives tuples from multiple streams,
    + * applies the join operation based on constraint and emit the joined value.
    + * Subclasses should provide implementation to createOutputTuple,copyValue, getKeyValue, getTime methods.
    + *
    + * <b>Properties:</b><br>
    + * <b>expiryTime</b>: Expiry time for stored tuples<br>
    + * <b>includeFieldStr</b>: List of comma separated fields to be added to the output tuple.
    + *                         Ex: Field1,Field2;Field3,Field4<br>
    + * <b>keyFields</b>: List of comma separated key field for both the streams. Ex: Field1,Field2<br>
    + * <b>timeFields</b>: List of comma separated time field for both the streams. Ex: Field1,Field2<br>
    + * <b>bucketSpanInMillis</b>: Span of each bucket in milliseconds.<br>
    + * <b>strategy</b>: Type of join operation. Default type is inner join<br>
    + * <br>
    + * </p>
    + *
    + * @displayName Abstract Join Operator
    + * @tags join
    + * @since 3.3.0
    + */
    +@Evolving
    +public abstract class AbstractJoinOperator<T> extends BaseOperator implements Operator.CheckpointListener
    +{
    +  public final transient DefaultOutputPort<List<T>> outputPort = new DefaultOutputPort<>();
    +  protected String[] timeFields;
    +
    +  protected String[][] includeFields;
    +  // Fields to compare from both the streams
    +  protected String[] keys;
    +  // Strategy of Join operation, by default the option is inner join
    +  protected JoinStrategy strategy = JoinStrategy.INNER_JOIN;
    +  // This represents whether the processing tuple is from left port or not
    +  protected Boolean isLeft;
    +  @InputPortFieldAnnotation(optional = true)
    +  public transient DefaultInputPort<T> input1 = new DefaultInputPort<T>()
    +  {
    +    @Override
    +    public void process(T tuple)
    +    {
    +      isLeft = true;
    +      processTuple(tuple);
    +    }
    +  };
    +  @InputPortFieldAnnotation(optional = true)
    +  public transient DefaultInputPort<T> input2 = new DefaultInputPort<T>()
    +  {
    +    @Override
    +    public void process(T tuple)
    +    {
    +      isLeft = false;
    +      processTuple(tuple);
    +    }
    +  };
    +  // Stores for each of the input port
    +  private BackupStore[] store = (BackupStore[])Array.newInstance(BackupStore.class, 2);
    +  private String includeFieldStr;
    +  private String keyFieldStr;
    +  private String timeFieldStr;
    +
    +  @Override
    +  public void setup(Context.OperatorContext context)
    +  {
    +    if (store[0] == null) {
    +      throw new RuntimeException("Left Store is Empty");
    +    }
    +    if (store[1] == null) {
    +      throw new RuntimeException("Right Store is Empty");
    +    }
    +    // Checks whether the strategy is outer join and set it to store
    +    boolean isOuter = strategy.equals(JoinStrategy.LEFT_OUTER_JOIN) || strategy.equals(JoinStrategy.OUTER_JOIN);
    +    store[0].isOuterJoin(isOuter);
    +    isOuter = strategy.equals(JoinStrategy.RIGHT_OUTER_JOIN) || strategy.equals(JoinStrategy.OUTER_JOIN);
    +    store[1].isOuterJoin(isOuter);
    +    // Setup the stores
    +    store[0].setup();
    +    store[1].setup();
    +
    +    populateFields();
    +  }
    +
    +  /**
    +   * Create the event with the given tuple. If it successfully inserted it into the store
    +   * then it does the join operation
    +   *
    +   * @param tuple Tuple to process
    +   */
    +  protected void processTuple(T tuple)
    +  {
    +    int idx = 0;
    +    if (!isLeft) {
    +      idx = 1;
    +    }
    +    TimeEvent t = createEvent(tuple);
    +    if (store[idx].put(t)) {
    +      join(t, isLeft);
    +    }
    +  }
    +
    +  private void populateFields()
    +  {
    +    populateIncludeFields();
    +    populateKeyFields();
    +    if (timeFieldStr != null) {
    +      populateTimeFields();
    +    }
    +  }
    +
    +  /**
    +   * Populate the fields from the includeFiledStr
    +   */
    +  private void populateIncludeFields()
    +  {
    +    includeFields = new String[2][];
    +    String[] portFields = includeFieldStr.split(";");
    +    for (int i = 0; i < portFields.length; i++) {
    +      includeFields[i] = portFields[i].split(",");
    +    }
    +  }
    +
    +  /**
    +   * Get the tuples from another store based on join constraint and key
    +   *
    +   * @param tuple  input
    +   * @param isLeft whether the given tuple is from first port or not
    +   */
    +  private void join(TimeEvent tuple, Boolean isLeft)
    +  {
    +    // Get the valid tuples from the store based on key
    +    // If the tuple is null means the join type is outer and return unmatched tuples from store.
    +    Object value;
    +    if (isLeft) {
    +      if (tuple != null) {
    +        value = store[1].getValidTuples(tuple);
    +      } else {
    +        value = store[1].getUnMatchedTuples();
    +      }
    +    } else {
    +      if (tuple != null) {
    +        value = store[0].getValidTuples(tuple);
    +      } else {
    +        value = store[0].getUnMatchedTuples();
    +      }
    +    }
    +    // Join the input tuple with the joined tuples
    +    if (value != null) {
    +      ArrayList<TimeEvent> joinedValues = (ArrayList<TimeEvent>)value;
    +      List<T> result = new ArrayList<>();
    +      for (TimeEvent joinedValue : joinedValues) {
    +        T output = createOutputTuple();
    +        Object tupleValue = null;
    +        if (tuple != null) {
    +          tupleValue = tuple.getValue();
    +        }
    +        copyValue(output, tupleValue, isLeft);
    +        copyValue(output, joinedValue.getValue(), !isLeft);
    +        result.add(output);
    +        joinedValue.setMatch(true);
    +      }
    +      if (tuple != null) {
    +        tuple.setMatch(true);
    +      }
    +      if (result.size() != 0) {
    +        outputPort.emit(result);
    +      }
    +    }
    +  }
    +
    +  // Emit the unmatched tuples, if the strategy is outer join
    +  @Override
    +  public void endWindow()
    +  {
    +    if (strategy.equals(JoinStrategy.LEFT_OUTER_JOIN) || strategy.equals(JoinStrategy.OUTER_JOIN)) {
    +      join(null, false);
    +    }
    +    if (strategy.equals(JoinStrategy.RIGHT_OUTER_JOIN) || strategy.equals(JoinStrategy.OUTER_JOIN)) {
    +      join(null, true);
    +    }
    +    store[0].endWindow();
    +    store[1].endWindow();
    +  }
    +
    +  @Override
    +  public void checkpointed(long windowId)
    +  {
    +    store[0].checkpointed(windowId);
    +    store[1].checkpointed(windowId);
    +  }
    +
    +  @Override
    +  public void committed(long windowId)
    +  {
    +    store[0].committed(windowId);
    +    store[1].committed(windowId);
    +  }
    +
    +  /**
    +   * Convert the given tuple to event
    +   *
    +   * @param tuple Given tuple to convert into event
    +   * @return event
    +   */
    +  protected TimeEvent createEvent(Object tuple)
    +  {
    +    int idx = 0;
    +    if (!isLeft) {
    +      idx = 1;
    +    }
    +    if (timeFields != null) {
    +      return new TimeEventImpl(getKeyValue(keys[idx], tuple), (Long)getTime(timeFields[idx], tuple), tuple);
    +    } else {
    +      return new TimeEventImpl(getKeyValue(keys[idx], tuple), Calendar.getInstance().getTimeInMillis(), tuple);
    +    }
    +  }
    +
    +  public void populateKeyFields()
    +  {
    +    this.keys = keyFieldStr.split(",");
    +  }
    +
    +  public JoinStrategy getStrategy()
    +  {
    +    return strategy;
    +  }
    +
    +  public void setStrategy(JoinStrategy strategy)
    +  {
    +    this.strategy = strategy;
    +  }
    +
    +  public void setLeftStore(BackupStore lStore)
    +  {
    +    store[0] = lStore;
    +  }
    +
    +  public void setRightStore(BackupStore rStore)
    --- End diff --
    
    check here if store being set is null or not


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: SPOI-4520 Implemented Inmemory...

Posted by chandnisingh <gi...@git.apache.org>.
Github user chandnisingh commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/113#discussion_r45898050
  
    --- Diff: library/src/main/java/com/datatorrent/lib/join/AbstractJoinOperator.java ---
    @@ -0,0 +1,343 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.join;
    +
    +import java.lang.reflect.Array;
    +import java.util.ArrayList;
    +import java.util.Calendar;
    +import java.util.List;
    +
    +import org.apache.hadoop.classification.InterfaceStability.Evolving;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DefaultInputPort;
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.api.Operator;
    +import com.datatorrent.api.annotation.InputPortFieldAnnotation;
    +import com.datatorrent.common.util.BaseOperator;
    +
    +/**
    + * <p>
    + * This is the base implementation of join operator. Operator receives tuples from multiple streams,
    + * applies the join operation based on constraint and emit the joined value.
    + * Subclasses should provide implementation to createOutputTuple,copyValue, getKeyValue, getTime methods.
    + *
    + * <b>Properties:</b><br>
    + * <b>expiryTime</b>: Expiry time for stored tuples<br>
    + * <b>includeFieldStr</b>: List of comma separated fields to be added to the output tuple.
    + *                         Ex: Field1,Field2;Field3,Field4<br>
    + * <b>keyFields</b>: List of comma separated key field for both the streams. Ex: Field1,Field2<br>
    + * <b>timeFields</b>: List of comma separated time field for both the streams. Ex: Field1,Field2<br>
    + * <b>bucketSpanInMillis</b>: Span of each bucket in milliseconds.<br>
    + * <b>strategy</b>: Type of join operation. Default type is inner join<br>
    + * <br>
    + * </p>
    + *
    + * @displayName Abstract Join Operator
    + * @tags join
    + */
    +@Evolving
    +public abstract class AbstractJoinOperator<T> extends BaseOperator implements Operator.CheckpointListener
    +{
    +  public final transient DefaultOutputPort<List<T>> outputPort = new DefaultOutputPort<>();
    +  protected String[] timeFields;
    +
    +  protected String[][] includeFields;
    +  // Fields to compare from both the streams
    +  protected String[] keys;
    +  // Strategy of Join operation, by default the option is inner join
    +  protected JoinStrategy strategy = JoinStrategy.INNER_JOIN;
    +  // This represents whether the processing tuple is from left port or not
    +  protected Boolean isLeft;
    +  @InputPortFieldAnnotation(optional = true)
    +  public transient DefaultInputPort<T> input1 = new DefaultInputPort<T>()
    +  {
    +    @Override
    +    public void process(T tuple)
    +    {
    +      isLeft = true;
    +      processTuple(tuple);
    +    }
    +  };
    +  @InputPortFieldAnnotation(optional = true)
    +  public transient DefaultInputPort<T> input2 = new DefaultInputPort<T>()
    +  {
    +    @Override
    +    public void process(T tuple)
    +    {
    +      isLeft = false;
    +      processTuple(tuple);
    +    }
    +  };
    +  // Stores for each of the input port
    +  private BackupStore[] store = (BackupStore[])Array.newInstance(BackupStore.class, 2);
    +  private String includeFieldStr;
    +  private String keyFieldStr;
    +  private String timeFieldStr;
    +
    +  @Override
    +  public void setup(Context.OperatorContext context)
    +  {
    +    if (store[0] == null) {
    --- End diff --
    
    @chaithu14 if store are annotated as @NotNull then this can be validated before launch. I think you made store as a 2 element array this is why you have to check it at run time.
    There are validation constraints that you can explore to make this validation happen before launch  for an array but I will suggest to have the 2 stores as different fields.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: SPOI-4520 Implemented Inmemory...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/113#discussion_r45945545
  
    --- Diff: library/src/main/java/com/datatorrent/lib/join/AbstractJoinOperator.java ---
    @@ -0,0 +1,343 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.join;
    +
    +import java.lang.reflect.Array;
    +import java.util.ArrayList;
    +import java.util.Calendar;
    +import java.util.List;
    +
    +import org.apache.hadoop.classification.InterfaceStability.Evolving;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DefaultInputPort;
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.api.Operator;
    +import com.datatorrent.api.annotation.InputPortFieldAnnotation;
    +import com.datatorrent.common.util.BaseOperator;
    +
    +/**
    + * <p>
    + * This is the base implementation of join operator. Operator receives tuples from multiple streams,
    + * applies the join operation based on constraint and emit the joined value.
    + * Subclasses should provide implementation to createOutputTuple,copyValue, getKeyValue, getTime methods.
    + *
    + * <b>Properties:</b><br>
    + * <b>expiryTime</b>: Expiry time for stored tuples<br>
    + * <b>includeFieldStr</b>: List of comma separated fields to be added to the output tuple.
    + *                         Ex: Field1,Field2;Field3,Field4<br>
    + * <b>keyFields</b>: List of comma separated key field for both the streams. Ex: Field1,Field2<br>
    + * <b>timeFields</b>: List of comma separated time field for both the streams. Ex: Field1,Field2<br>
    + * <b>bucketSpanInMillis</b>: Span of each bucket in milliseconds.<br>
    + * <b>strategy</b>: Type of join operation. Default type is inner join<br>
    + * <br>
    + * </p>
    + *
    + * @displayName Abstract Join Operator
    + * @tags join
    + */
    +@Evolving
    +public abstract class AbstractJoinOperator<T> extends BaseOperator implements Operator.CheckpointListener
    +{
    +  public final transient DefaultOutputPort<List<T>> outputPort = new DefaultOutputPort<>();
    +  protected String[] timeFields;
    +
    +  protected String[][] includeFields;
    +  // Fields to compare from both the streams
    +  protected String[] keys;
    +  // Strategy of Join operation, by default the option is inner join
    +  protected JoinStrategy strategy = JoinStrategy.INNER_JOIN;
    +  // This represents whether the processing tuple is from left port or not
    +  protected Boolean isLeft;
    +  @InputPortFieldAnnotation(optional = true)
    +  public transient DefaultInputPort<T> input1 = new DefaultInputPort<T>()
    +  {
    +    @Override
    +    public void process(T tuple)
    +    {
    +      isLeft = true;
    +      processTuple(tuple);
    +    }
    +  };
    +  @InputPortFieldAnnotation(optional = true)
    +  public transient DefaultInputPort<T> input2 = new DefaultInputPort<T>()
    +  {
    +    @Override
    +    public void process(T tuple)
    +    {
    +      isLeft = false;
    +      processTuple(tuple);
    +    }
    +  };
    +  // Stores for each of the input port
    +  private BackupStore[] store = (BackupStore[])Array.newInstance(BackupStore.class, 2);
    +  private String includeFieldStr;
    +  private String keyFieldStr;
    +  private String timeFieldStr;
    +
    +  @Override
    +  public void setup(Context.OperatorContext context)
    +  {
    +    if (store[0] == null) {
    +      throw new RuntimeException("Left Store is Empty");
    +    }
    +    if (store[1] == null) {
    +      throw new RuntimeException("Right Store is Empty");
    +    }
    +    // Checks whether the strategy is outer join and set it to store
    +    boolean isOuter = strategy.equals(JoinStrategy.LEFT_OUTER_JOIN) || strategy.equals(JoinStrategy.OUTER_JOIN);
    +    store[0].isOuterJoin(isOuter);
    +    isOuter = strategy.equals(JoinStrategy.RIGHT_OUTER_JOIN) || strategy.equals(JoinStrategy.OUTER_JOIN);
    +    store[1].isOuterJoin(isOuter);
    +    // Setup the stores
    +    store[0].setup();
    +    store[1].setup();
    +
    +    populateFields();
    +  }
    +
    +  /**
    +   * Create the event with the given tuple. If it successfully inserted it into the store
    +   * then it does the join operation
    +   *
    +   * @param tuple Tuple to process
    +   */
    +  protected void processTuple(T tuple)
    +  {
    +    int idx = 0;
    +    if (!isLeft) {
    +      idx = 1;
    +    }
    +    TimeEvent t = createEvent(tuple);
    +    if (store[idx].put(t)) {
    +      join(t, isLeft);
    +    }
    +  }
    +
    +  private void populateFields()
    +  {
    +    populateIncludeFields();
    +    populateKeyFields();
    +    if (timeFieldStr != null) {
    +      populateTimeFields();
    +    }
    +  }
    +
    +  /**
    +   * Populate the fields from the includeFiledStr
    +   */
    +  private void populateIncludeFields()
    +  {
    +    includeFields = new String[2][];
    +    String[] portFields = includeFieldStr.split(";");
    --- End diff --
    
    Do you want to make the check that portFields is of size 2?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: SPOI-4520 Implemented Inmemory...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/113#discussion_r45945151
  
    --- Diff: library/src/main/java/com/datatorrent/lib/join/Bucket.java ---
    @@ -0,0 +1,92 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.join;
    +
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.apache.hadoop.classification.InterfaceStability;
    +
    +import com.google.common.collect.Lists;
    +import com.google.common.collect.Maps;
    +
    +/**
    + * <p>
    + * This is the base implementation of bucket which contains all the events which belong to the same bucket.
    + * </p>
    + *
    + * @param <T> type of bucket events
    + * @since 2.2.0
    --- End diff --
    
    remove since tag


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1720 Implemented Inmemory...

Posted by chaithu14 <gi...@git.apache.org>.
Github user chaithu14 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/113#discussion_r46123515
  
    --- Diff: library/src/main/java/com/datatorrent/lib/join/AbstractJoinOperator.java ---
    @@ -0,0 +1,343 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.join;
    +
    +import java.lang.reflect.Array;
    +import java.util.ArrayList;
    +import java.util.Calendar;
    +import java.util.List;
    +
    +import org.apache.hadoop.classification.InterfaceStability.Evolving;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DefaultInputPort;
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.api.Operator;
    +import com.datatorrent.api.annotation.InputPortFieldAnnotation;
    +import com.datatorrent.common.util.BaseOperator;
    +
    +/**
    + * <p>
    + * This is the base implementation of join operator. Operator receives tuples from multiple streams,
    + * applies the join operation based on constraint and emit the joined value.
    + * Subclasses should provide implementation to createOutputTuple,copyValue, getKeyValue, getTime methods.
    + *
    + * <b>Properties:</b><br>
    + * <b>expiryTime</b>: Expiry time for stored tuples<br>
    + * <b>includeFieldStr</b>: List of comma separated fields to be added to the output tuple.
    + *                         Ex: Field1,Field2;Field3,Field4<br>
    + * <b>keyFields</b>: List of comma separated key field for both the streams. Ex: Field1,Field2<br>
    + * <b>timeFields</b>: List of comma separated time field for both the streams. Ex: Field1,Field2<br>
    + * <b>bucketSpanInMillis</b>: Span of each bucket in milliseconds.<br>
    + * <b>strategy</b>: Type of join operation. Default type is inner join<br>
    + * <br>
    + * </p>
    + *
    + * @displayName Abstract Join Operator
    + * @tags join
    + */
    +@Evolving
    +public abstract class AbstractJoinOperator<T> extends BaseOperator implements Operator.CheckpointListener
    +{
    +  public final transient DefaultOutputPort<List<T>> outputPort = new DefaultOutputPort<>();
    +  protected String[] timeFields;
    +
    +  protected String[][] includeFields;
    +  // Fields to compare from both the streams
    +  protected String[] keys;
    +  // Strategy of Join operation, by default the option is inner join
    +  protected JoinStrategy strategy = JoinStrategy.INNER_JOIN;
    +  // This represents whether the processing tuple is from left port or not
    +  protected Boolean isLeft;
    +  @InputPortFieldAnnotation(optional = true)
    +  public transient DefaultInputPort<T> input1 = new DefaultInputPort<T>()
    +  {
    +    @Override
    +    public void process(T tuple)
    +    {
    +      isLeft = true;
    +      processTuple(tuple);
    +    }
    +  };
    +  @InputPortFieldAnnotation(optional = true)
    +  public transient DefaultInputPort<T> input2 = new DefaultInputPort<T>()
    +  {
    +    @Override
    +    public void process(T tuple)
    +    {
    +      isLeft = false;
    +      processTuple(tuple);
    +    }
    +  };
    +  // Stores for each of the input port
    +  private BackupStore[] store = (BackupStore[])Array.newInstance(BackupStore.class, 2);
    +  private String includeFieldStr;
    +  private String keyFieldStr;
    +  private String timeFieldStr;
    +
    +  @Override
    +  public void setup(Context.OperatorContext context)
    +  {
    +    if (store[0] == null) {
    --- End diff --
    
    Okay. Updated as per comment


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1720 Implemented Inmemory...

Posted by chandnisingh <gi...@git.apache.org>.
Github user chandnisingh commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/113#discussion_r49049967
  
    --- Diff: library/src/main/java/com/datatorrent/lib/join/JoinStore.java ---
    @@ -0,0 +1,91 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.join;
    +
    +import java.util.List;
    +
    +import org.apache.hadoop.classification.InterfaceStability;
    +
    +import com.datatorrent.api.Component;
    +
    +/**
    + * <p>
    + * Interface of store for join operation.
    + * </p>
    + */
    +@InterfaceStability.Unstable
    +public interface JoinStore extends Component
    +{
    +  /**
    +   * Generate the store
    +   */
    +
    +  /**
    +   * Perform the committed operation
    +   * @param windowId
    +   */
    +  void committed(long windowId);
    +
    +  /**
    +   * Save the state of store
    +   * @param windowId
    +   */
    +  void checkpointed(long windowId);
    +
    +  /**
    +   * Add the operations, any needed for store before begin the window
    +   * @param windowId
    +   */
    +  void beginWindow(long windowId);
    +
    +  /**
    +   *
    +   */
    +  void endWindow();
    +
    +  /**
    +   * Get the key from the given tuple and with that key, get the tuples which satisfies the join constraint
    +   * from the store.
    +   *
    +   * @param tuple Given tuple
    +   * @return the valid tuples which statisfies the join constraint
    +   */
    +  List<?> getValidTuples(Object tuple);
    +
    +  /**
    +   * Insert the given tuple
    +   *
    +   * @param tuple Given tuple
    +   */
    +  Boolean put(Object tuple);
    --- End diff --
    
    why not primitive boolean?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1720 Implemented Inmemory...

Posted by chaithu14 <gi...@git.apache.org>.
Github user chaithu14 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/113#discussion_r49052838
  
    --- Diff: library/src/main/java/com/datatorrent/lib/join/JoinStore.java ---
    @@ -0,0 +1,91 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.join;
    +
    +import java.util.List;
    +
    +import org.apache.hadoop.classification.InterfaceStability;
    +
    +import com.datatorrent.api.Component;
    +
    +/**
    + * <p>
    + * Interface of store for join operation.
    + * </p>
    + */
    +@InterfaceStability.Unstable
    +public interface JoinStore extends Component
    +{
    +  /**
    +   * Generate the store
    +   */
    +
    +  /**
    +   * Perform the committed operation
    +   * @param windowId
    +   */
    +  void committed(long windowId);
    +
    +  /**
    +   * Save the state of store
    +   * @param windowId
    +   */
    +  void checkpointed(long windowId);
    +
    +  /**
    +   * Add the operations, any needed for store before begin the window
    +   * @param windowId
    +   */
    +  void beginWindow(long windowId);
    +
    +  /**
    +   *
    +   */
    +  void endWindow();
    +
    +  /**
    +   * Get the key from the given tuple and with that key, get the tuples which satisfies the join constraint
    +   * from the store.
    +   *
    +   * @param tuple Given tuple
    +   * @return the valid tuples which statisfies the join constraint
    +   */
    +  List<?> getValidTuples(Object tuple);
    +
    +  /**
    +   * Insert the given tuple
    +   *
    +   * @param tuple Given tuple
    +   */
    +  Boolean put(Object tuple);
    --- End diff --
    
    Return type has to be a primitive. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: SPOI-4520 Implemented Inmemory...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/113#discussion_r45865154
  
    --- Diff: library/src/main/java/com/datatorrent/lib/join/BackupStore.java ---
    @@ -0,0 +1,76 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.join;
    +
    +import org.apache.hadoop.classification.InterfaceStability.Evolving;
    +
    +/**
    + * <p>
    + * Interface of store for join operation.
    + * </p>
    + */
    +@Evolving
    +public interface BackupStore
    --- End diff --
    
    Should interface be just named store? If this is backup which is primary?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: SPOI-4520 Implemented Inmemory...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/113#discussion_r45864632
  
    --- Diff: library/src/main/java/com/datatorrent/lib/join/BackupStore.java ---
    @@ -0,0 +1,76 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.join;
    +
    +import org.apache.hadoop.classification.InterfaceStability.Evolving;
    +
    +/**
    + * <p>
    + * Interface of store for join operation.
    + * </p>
    + */
    +@Evolving
    +public interface BackupStore
    +{
    +  /**
    +   * Generate the store
    +   */
    +  void setup();
    +
    +  void committed(long windowId);
    +
    +  void checkpointed(long windowId);
    +
    +  void endWindow();
    --- End diff --
    
    you should also have beginWindow() call


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: SPOI-4520 Implemented Inmemory...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/113#discussion_r45859918
  
    --- Diff: library/src/main/java/com/datatorrent/lib/join/AbstractJoinOperator.java ---
    @@ -0,0 +1,341 @@
    +/*
    + * Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved.
    + *
    + * Licensed under the Apache License, Version 2.0 (the "License");
    + * you may not use this file except in compliance with the License.
    + * You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    --- End diff --
    
    Wrong license


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: SPOI-4520 Implemented Inmemory...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/113#discussion_r45860258
  
    --- Diff: library/src/main/java/com/datatorrent/lib/join/AbstractJoinOperator.java ---
    @@ -0,0 +1,341 @@
    +/*
    + * Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved.
    + *
    + * Licensed under the Apache License, Version 2.0 (the "License");
    + * you may not use this file except in compliance with the License.
    + * You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package com.datatorrent.lib.join;
    +
    +import java.lang.reflect.Array;
    +import java.util.ArrayList;
    +import java.util.Calendar;
    +import java.util.List;
    +
    +import org.apache.hadoop.classification.InterfaceStability.Evolving;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DefaultInputPort;
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.api.Operator;
    +import com.datatorrent.api.annotation.InputPortFieldAnnotation;
    +import com.datatorrent.common.util.BaseOperator;
    +
    +/**
    + * <p>
    + * This is the base implementation of join operator. Operator receives tuples from multiple streams,
    + * applies the join operation based on constraint and emit the joined value.
    + * Subclasses should provide implementation to createOutputTuple,copyValue, getKeyValue, getTime methods.
    + *
    + * <b>Properties:</b><br>
    + * <b>expiryTime</b>: Expiry time for stored tuples<br>
    + * <b>includeFieldStr</b>: List of comma separated fields to be added to the output tuple.
    + *                         Ex: Field1,Field2;Field3,Field4<br>
    + * <b>keyFields</b>: List of comma separated key field for both the streams. Ex: Field1,Field2<br>
    + * <b>timeFields</b>: List of comma separated time field for both the streams. Ex: Field1,Field2<br>
    + * <b>bucketSpanInMillis</b>: Span of each bucket in milliseconds.<br>
    + * <b>strategy</b>: Type of join operation. Default type is inner join<br>
    + * <br>
    + * </p>
    + *
    + * @displayName Abstract Join Operator
    + * @tags join
    + * @since 3.3.0
    + */
    +@Evolving
    +public abstract class AbstractJoinOperator<T> extends BaseOperator implements Operator.CheckpointListener
    +{
    +  public final transient DefaultOutputPort<List<T>> outputPort = new DefaultOutputPort<>();
    +  protected String[] timeFields;
    +
    +  protected String[][] includeFields;
    +  // Fields to compare from both the streams
    +  protected String[] keys;
    +  // Strategy of Join operation, by default the option is inner join
    +  protected JoinStrategy strategy = JoinStrategy.INNER_JOIN;
    +  // This represents whether the processing tuple is from left port or not
    +  protected Boolean isLeft;
    +  @InputPortFieldAnnotation(optional = true)
    --- End diff --
    
    Why are input ports optional? You need atleast two streams to do join


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1720 Implemented Inmemory...

Posted by chandnisingh <gi...@git.apache.org>.
Github user chandnisingh commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/113#discussion_r49053263
  
    --- Diff: library/src/main/java/com/datatorrent/lib/join/JoinStore.java ---
    @@ -0,0 +1,91 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.join;
    +
    +import java.util.List;
    +
    +import org.apache.hadoop.classification.InterfaceStability;
    +
    +import com.datatorrent.api.Component;
    +
    +/**
    + * <p>
    + * Interface of store for join operation.
    + * </p>
    + */
    +@InterfaceStability.Unstable
    +public interface JoinStore extends Component
    +{
    +  /**
    +   * Generate the store
    +   */
    +
    +  /**
    +   * Perform the committed operation
    +   * @param windowId
    +   */
    +  void committed(long windowId);
    +
    +  /**
    +   * Save the state of store
    +   * @param windowId
    +   */
    +  void checkpointed(long windowId);
    +
    +  /**
    +   * Add the operations, any needed for store before begin the window
    +   * @param windowId
    +   */
    +  void beginWindow(long windowId);
    +
    +  /**
    +   *
    +   */
    +  void endWindow();
    +
    +  /**
    +   * Get the key from the given tuple and with that key, get the tuples which satisfies the join constraint
    +   * from the store.
    +   *
    +   * @param tuple Given tuple
    +   * @return the valid tuples which statisfies the join constraint
    +   */
    +  List<?> getValidTuples(Object tuple);
    +
    +  /**
    +   * Insert the given tuple
    +   *
    +   * @param tuple Given tuple
    +   */
    +  Boolean put(Object tuple);
    --- End diff --
    
    @chaithu14 why not return boolean instead of Boolean?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: SPOI-4520 Implemented Inmemory...

Posted by chaithu14 <gi...@git.apache.org>.
Github user chaithu14 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/113#discussion_r46110800
  
    --- Diff: library/src/main/java/com/datatorrent/lib/join/AbstractJoinOperator.java ---
    @@ -0,0 +1,341 @@
    +/*
    + * Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved.
    + *
    + * Licensed under the Apache License, Version 2.0 (the "License");
    + * you may not use this file except in compliance with the License.
    + * You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package com.datatorrent.lib.join;
    +
    +import java.lang.reflect.Array;
    +import java.util.ArrayList;
    +import java.util.Calendar;
    +import java.util.List;
    +
    +import org.apache.hadoop.classification.InterfaceStability.Evolving;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DefaultInputPort;
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.api.Operator;
    +import com.datatorrent.api.annotation.InputPortFieldAnnotation;
    +import com.datatorrent.common.util.BaseOperator;
    +
    +/**
    + * <p>
    + * This is the base implementation of join operator. Operator receives tuples from multiple streams,
    --- End diff --
    
    Yes. Implementation is for two streams. Will change the description.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---