You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/09/01 18:30:54 UTC

[10/12] apex-malhar git commit: Updated algo & working on math operators

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/AbstractSqlStreamOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/AbstractSqlStreamOperator.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/AbstractSqlStreamOperator.java
new file mode 100644
index 0000000..16f1036
--- /dev/null
+++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/AbstractSqlStreamOperator.java
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.streamquery;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+
+/**
+ * A base implementation of a BaseOperator that is a sql stream operator.   Subclasses should provide the
+   implementation of how to process the tuples.
+ * <p>
+ * Abstract sql db input operator.
+ * <p>
+ * @displayName Abstract Sql Stream
+ * @category Stream Manipulators
+ * @tags sql operator
+ * @since 0.3.2
+ * @deprecated
+ */
+@Deprecated
+public abstract class AbstractSqlStreamOperator extends BaseOperator
+{
+  public static class InputSchema
+  {
+    public static class ColumnInfo
+    {
+      public String type;
+      public int bindIndex = 0;
+      public boolean isColumnIndex = false;
+    }
+
+    /**
+     * the name of the input "table"
+     */
+    public String name;
+    /**
+     * key is the name of the column, and value is the SQL type
+     */
+    public HashMap<String, ColumnInfo> columnInfoMap = new HashMap<String, ColumnInfo>();
+
+    public InputSchema()
+    {
+    }
+
+    public InputSchema(String name)
+    {
+      this.name = name;
+    }
+
+    public void setColumnInfo(String columnName, String columnType, boolean isColumnIndex)
+    {
+      ColumnInfo t = new ColumnInfo();
+      t.type = columnType;
+      t.isColumnIndex = isColumnIndex;
+      columnInfoMap.put(columnName, t);
+    }
+
+  }
+
+  protected String statement;
+  protected ArrayList<InputSchema> inputSchemas = new ArrayList<InputSchema>(5);
+  protected transient ArrayList<Object> bindings;
+
+  /**
+   * Input bindings port that takes an arraylist of objects.
+   */
+  @InputPortFieldAnnotation(optional = true)
+  public final transient DefaultInputPort<ArrayList<Object>> bindingsPort = new DefaultInputPort<ArrayList<Object>>()
+  {
+    @Override
+    public void process(ArrayList<Object> tuple)
+    {
+      bindings = tuple;
+    }
+
+  };
+
+  /**
+   * Input port in1 that takes a hashmap of &lt;string,object&gt;.
+   */
+  public final transient DefaultInputPort<HashMap<String, Object>> in1 = new DefaultInputPort<HashMap<String, Object>>()
+  {
+    @Override
+    public void process(HashMap<String, Object> tuple)
+    {
+      processTuple(0, tuple);
+    }
+
+  };
+
+  /**
+   * Input port in2 that takes a hashmap of &lt;string,object&gt;.
+   */
+  @InputPortFieldAnnotation(optional = true)
+  public final transient DefaultInputPort<HashMap<String, Object>> in2 = new DefaultInputPort<HashMap<String, Object>>()
+  {
+    @Override
+    public void process(HashMap<String, Object> tuple)
+    {
+      processTuple(1, tuple);
+    }
+
+  };
+
+  /**
+   * Input port in3 that takes a hashmap of &lt;string,object&gt;.
+   */
+  @InputPortFieldAnnotation(optional = true)
+  public final transient DefaultInputPort<HashMap<String, Object>> in3 = new DefaultInputPort<HashMap<String, Object>>()
+  {
+    @Override
+    public void process(HashMap<String, Object> tuple)
+    {
+      processTuple(2, tuple);
+    }
+
+  };
+
+  /**
+   * Input port in4 that takes a hashmap of &lt;string,object&gt;.
+   */
+  @InputPortFieldAnnotation(optional = true)
+  public final transient DefaultInputPort<HashMap<String, Object>> in4 = new DefaultInputPort<HashMap<String, Object>>()
+  {
+    @Override
+    public void process(HashMap<String, Object> tuple)
+    {
+      processTuple(3, tuple);
+    }
+
+  };
+
+  /**
+   * Input port in5 that takes a hashmap of &lt;string,object&gt;.
+   */
+  @InputPortFieldAnnotation(optional = true)
+  public final transient DefaultInputPort<HashMap<String, Object>> in5 = new DefaultInputPort<HashMap<String, Object>>()
+  {
+    @Override
+    public void process(HashMap<String, Object> tuple)
+    {
+      processTuple(4, tuple);
+    }
+
+  };
+
+  /**
+   * Output result port that emits a hashmap of &lt;string,object&gt;.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort<HashMap<String, Object>> result = new DefaultOutputPort<HashMap<String, Object>>();
+
+  public void setStatement(String statement)
+  {
+    this.statement = statement;
+  }
+
+  public String getStatement()
+  {
+    return this.statement;
+  }
+
+  public void setInputSchema(int inputPortIndex, InputSchema inputSchema)
+  {
+    inputSchemas.add(inputPortIndex, inputSchema);
+  }
+
+  public abstract void processTuple(int tableNum, HashMap<String, Object> tuple);
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/DeleteOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/DeleteOperator.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/DeleteOperator.java
new file mode 100644
index 0000000..7faf96d
--- /dev/null
+++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/DeleteOperator.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.streamquery;
+
+import java.util.Map;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.streamquery.condition.Condition;
+
+/**
+ * An implementation of BaseOperator that provides sql delete query semantic on live data stream. <br>
+ * <p>
+ * Stream rows passing condition are emitted on output port stream. <br>
+ * <br>
+ * <b>StateFull : NO,</b> all row data is processed in current time window. <br>
+ * <b>Partitions : Yes, </b> No Input dependency among input rows. <br>
+ * <br>
+ * <b>Ports</b>:<br>
+ * <b> inport : </b> Input hash map(row) port, expects
+ * HashMap&lt;String,Object&gt;<<br>
+ * <b> outport : </b> Output hash map(row) port, emits
+ * HashMap&lt;String,Object&gt;<br>
+ * <br>
+ * <b> Properties : <b> <br>
+ * <b> condition : </b> Select condition for selecting rows. <br>
+ * <b> columns : </b> Column names/aggregate functions for select. <br>
+ * <br>
+ * @displayName Delete
+ * @category Stream Manipulators
+ * @tags sql delete operator
+ * @since 0.3.3
+ * @deprecated
+ */
+@Deprecated
+public class DeleteOperator extends BaseOperator
+{
+
+  /**
+   * condition.
+   */
+  private Condition condition = null;
+
+  /**
+   * set condition.
+   */
+  public void setCondition(Condition condition)
+  {
+    this.condition = condition;
+  }
+
+  /**
+   * Input port that takes a map of &lt;string,object&gt;.
+   */
+  public final transient DefaultInputPort<Map<String, Object>> inport = new DefaultInputPort<Map<String, Object>>()
+  {
+
+    @Override
+    public void process(Map<String, Object> tuple)
+    {
+      if ((condition != null) && (!condition.isValidRow(tuple))) {
+        outport.emit(tuple);
+      }
+    }
+  };
+
+  /**
+   * Output port emits a map of &lt;string,object&gt;.
+   */
+  public final transient DefaultOutputPort<Map<String, Object>> outport = new DefaultOutputPort<Map<String, Object>>();
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/DerbySqlStreamOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/DerbySqlStreamOperator.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/DerbySqlStreamOperator.java
new file mode 100644
index 0000000..a55c7d7
--- /dev/null
+++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/DerbySqlStreamOperator.java
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.streamquery;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.apex.malhar.contrib.misc.streamquery.AbstractSqlStreamOperator.InputSchema.ColumnInfo;
+
+import com.datatorrent.api.Context.OperatorContext;
+
+/**
+ * An implementation of AbstractSqlStreamOperator that provides embedded derby sql input operator.
+ * <p>
+ * @displayName Derby Sql Stream
+ * @category Stream Manipulators
+ * @tags sql, in-memory, input operator
+ * @since 0.3.2
+ * @deprecated
+ */
+@Deprecated
+public class DerbySqlStreamOperator extends AbstractSqlStreamOperator
+{
+  protected transient ArrayList<PreparedStatement> insertStatements = new ArrayList<PreparedStatement>(5);
+  protected List<String> execStmtStringList = new ArrayList<String>();
+  protected transient ArrayList<PreparedStatement> execStatements = new ArrayList<PreparedStatement>(5);
+  protected transient ArrayList<PreparedStatement> deleteStatements = new ArrayList<PreparedStatement>(5);
+  protected transient Connection db;
+
+  public void addExecStatementString(String stmt)
+  {
+    this.execStmtStringList.add(stmt);
+  }
+
+
+  @Override
+  public void setup(OperatorContext context)
+  {
+    System.setProperty("derby.stream.error.file", "/dev/null");
+    try {
+      Class.forName("org.apache.derby.jdbc.EmbeddedDriver").newInstance();
+    } catch (Exception ex) {
+      throw new RuntimeException(ex);
+    }
+
+    String connUrl = "jdbc:derby:memory:MALHAR_TEMP;create=true";
+    PreparedStatement st;
+
+    try {
+      db = DriverManager.getConnection(connUrl);
+      // create the temporary tables here
+      for (int i = 0; i < inputSchemas.size(); i++) {
+        InputSchema inputSchema = inputSchemas.get(i);
+        if (inputSchema == null || inputSchema.columnInfoMap.isEmpty()) {
+          continue;
+        }
+        String columnSpec = "";
+        String columnNames = "";
+        String insertQuestionMarks = "";
+        int j = 0;
+        for (Map.Entry<String, ColumnInfo> entry : inputSchema.columnInfoMap.entrySet()) {
+          if (!columnSpec.isEmpty()) {
+            columnSpec += ",";
+            columnNames += ",";
+            insertQuestionMarks += ",";
+          }
+          columnSpec += entry.getKey();
+          columnSpec += " ";
+          columnSpec += entry.getValue().type;
+          columnNames += entry.getKey();
+          insertQuestionMarks += "?";
+          entry.getValue().bindIndex = ++j;
+        }
+        String createTempTableStmt =
+            "DECLARE GLOBAL TEMPORARY TABLE SESSION." + inputSchema.name + "(" + columnSpec + ") NOT LOGGED";
+        st = db.prepareStatement(createTempTableStmt);
+        st.execute();
+        st.close();
+
+        String insertStmt = "INSERT INTO SESSION." + inputSchema.name + " (" + columnNames + ") VALUES ("
+            + insertQuestionMarks + ")";
+
+        insertStatements.add(i, db.prepareStatement(insertStmt));
+        deleteStatements.add(i, db.prepareStatement("DELETE FROM SESSION." + inputSchema.name));
+      }
+      for (String stmtStr : execStmtStringList) {
+        execStatements.add(db.prepareStatement(stmtStr));
+      }
+    } catch (SQLException ex) {
+      throw new RuntimeException(ex);
+    }
+  }
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+    try {
+      db.setAutoCommit(false);
+    } catch (SQLException ex) {
+      throw new RuntimeException(ex);
+    }
+  }
+
+  @Override
+  public void processTuple(int tableNum, HashMap<String, Object> tuple)
+  {
+    InputSchema inputSchema = inputSchemas.get(tableNum);
+
+    PreparedStatement insertStatement = insertStatements.get(tableNum);
+    try {
+      for (Map.Entry<String, Object> entry : tuple.entrySet()) {
+        ColumnInfo t = inputSchema.columnInfoMap.get(entry.getKey());
+        if (t != null && t.bindIndex != 0) {
+          insertStatement.setString(t.bindIndex, entry.getValue().toString());
+        }
+      }
+
+      insertStatement.executeUpdate();
+      insertStatement.clearParameters();
+    } catch (SQLException ex) {
+      throw new RuntimeException(ex);
+    }
+  }
+
+  @Override
+  public void endWindow()
+  {
+    try {
+      db.commit();
+      if (bindings != null) {
+        for (int i = 0; i < bindings.size(); i++) {
+          for (PreparedStatement stmt : execStatements) {
+            stmt.setString(i, bindings.get(i).toString());
+          }
+        }
+      }
+
+      for (PreparedStatement stmt : execStatements) {
+        executePreparedStatement(stmt);
+      }
+      for (PreparedStatement st : deleteStatements) {
+        st.executeUpdate();
+        st.clearParameters();
+      }
+    } catch (SQLException ex) {
+      throw new RuntimeException(ex);
+    }
+    bindings = null;
+  }
+
+  private void executePreparedStatement(PreparedStatement statement) throws SQLException
+  {
+    ResultSet res = statement.executeQuery();
+    ResultSetMetaData resmeta = res.getMetaData();
+    int columnCount = resmeta.getColumnCount();
+    while (res.next()) {
+      HashMap<String, Object> resultRow = new HashMap<String, Object>();
+      for (int i = 1; i <= columnCount; i++) {
+        resultRow.put(resmeta.getColumnName(i), res.getObject(i));
+      }
+      this.result.emit(resultRow);
+    }
+    statement.clearParameters();
+  }
+
+  @Override
+  public void teardown()
+  {
+    try {
+      db.close();
+    } catch (SQLException ex) {
+      throw new RuntimeException(ex);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/GroupByHavingOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/GroupByHavingOperator.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/GroupByHavingOperator.java
new file mode 100644
index 0000000..9999429
--- /dev/null
+++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/GroupByHavingOperator.java
@@ -0,0 +1,230 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.streamquery;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.validation.constraints.NotNull;
+
+import org.apache.apex.malhar.contrib.misc.streamquery.condition.HavingCondition;
+import org.apache.apex.malhar.contrib.misc.streamquery.function.FunctionIndex;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.streamquery.index.ColumnIndex;
+import com.datatorrent.lib.streamquery.condition.Condition;
+@Deprecated
+public class GroupByHavingOperator extends BaseOperator
+{
+
+  /**
+   * aggregate indexes.
+   */
+  private ArrayList<FunctionIndex> aggregates = new ArrayList<FunctionIndex>();
+
+  /**
+   * Column, Group by names
+   */
+  private ArrayList<ColumnIndex> columnGroupIndexes = new ArrayList<ColumnIndex>();
+
+  /**
+   * where condition.
+   */
+  private Condition condition;
+
+  /**
+   * having aggregate condtion;
+   */
+  private ArrayList<HavingCondition> havingConditions = new ArrayList<HavingCondition>();
+
+  /**
+   * Table rows.
+   */
+  private ArrayList<Map<String, Object>> rows = new ArrayList<Map<String, Object>>();
+
+  public void addAggregateIndex(@NotNull FunctionIndex index)
+  {
+    aggregates.add(index);
+  }
+
+  public void addColumnGroupByIndex(@NotNull ColumnIndex index)
+  {
+    columnGroupIndexes.add(index);
+  }
+
+  public void addHavingCondition(@NotNull HavingCondition condition)
+  {
+    havingConditions.add(condition);
+  }
+
+  /**
+   * @param condition condition
+   */
+  public void setCondition(Condition condition)
+  {
+    this.condition = condition;
+  }
+
+  /**
+   * Input port that takes a map of &lt;string,object&gt;.
+   */
+  public final transient DefaultInputPort<Map<String, Object>> inport = new DefaultInputPort<Map<String, Object>>()
+  {
+
+    @Override
+    public void process(Map<String, Object> tuple)
+    {
+      if ((condition != null) && (!condition.isValidRow(tuple))) {
+        return;
+      }
+      rows.add(tuple);
+    }
+  };
+
+  /**
+   * Output port that emits a map of &lt;string,object&gt;.
+   */
+  public final transient DefaultOutputPort<Map<String, Object>> outport = new DefaultOutputPort<Map<String, Object>>();
+
+  /**
+   * Create aggregate at end window.
+   */
+  @Override
+  public void endWindow()
+  {
+    // group names
+    if (columnGroupIndexes.size() == 0) {
+      rows = new ArrayList<Map<String, Object>>();
+      return;
+    }
+
+    // group rows
+    HashMap<MultiKeyCompare, ArrayList<Map<String, Object>>> groups = new HashMap<MultiKeyCompare, ArrayList<Map<String, Object>>>();
+    for (Map<String, Object> row : rows) {
+      MultiKeyCompare key = new MultiKeyCompare();
+      for (ColumnIndex index : columnGroupIndexes) {
+        key.addCompareKey(row.get(index.getColumn()));
+      }
+      ArrayList<Map<String, Object>> subRows;
+      if (groups.containsKey(key)) {
+        subRows = groups.get(key);
+      } else {
+        subRows = new ArrayList<Map<String, Object>>();
+        groups.put(key, subRows);
+      }
+      subRows.add(row);
+    }
+
+    // Iterate over groups and emit aggregate values
+    for (Map.Entry<MultiKeyCompare, ArrayList<Map<String, Object>>> entry : groups
+        .entrySet()) {
+      ArrayList<Map<String, Object>> subRows = entry.getValue();
+
+      // get result
+      Map<String, Object> result = new HashMap<String, Object>();
+      for (ColumnIndex index : columnGroupIndexes) {
+        index.filter(subRows.get(0), result);
+      }
+
+      // append aggregate values
+      for (FunctionIndex aggregate : aggregates) {
+        try {
+          aggregate.filter(subRows, result);
+        } catch (Exception e) {
+          e.printStackTrace();
+        }
+      }
+
+      // check valid having aggregate
+      boolean isValidHaving = true;
+      for (HavingCondition condition : havingConditions) {
+        try {
+          isValidHaving &= condition.isValidAggregate(subRows);
+        } catch (Exception e) {
+          e.printStackTrace();
+          return;
+        }
+      }
+      if (isValidHaving) {
+        outport.emit(result);
+      }
+    }
+
+    rows = new ArrayList<Map<String, Object>>();
+  }
+
+  /**
+   * multi key compare class.
+   */
+  @SuppressWarnings("rawtypes")
+  private class MultiKeyCompare implements Comparable
+  {
+
+    /**
+     * compare keys.
+     */
+    ArrayList<Object> compareKeys = new ArrayList<Object>();
+
+    @Override
+    public boolean equals(Object other)
+    {
+      if (other instanceof MultiKeyCompare) {
+        if (compareKeys.size() != ((MultiKeyCompare)other).compareKeys.size()) {
+          return false;
+        }
+      }
+      for (int i = 0; i < compareKeys.size(); i++) {
+        if (!(compareKeys.get(i).equals(((MultiKeyCompare)other).compareKeys.get(i)))) {
+          return false;
+        }
+      }
+      return true;
+    }
+
+    @Override
+    public int hashCode()
+    {
+      int hashCode = 0;
+      for (int i = 0; i < compareKeys.size(); i++) {
+        hashCode += compareKeys.get(i).hashCode();
+      }
+      return hashCode;
+    }
+
+    @Override
+    public int compareTo(Object other)
+    {
+      if (this.equals(other)) {
+        return 0;
+      }
+      return -1;
+    }
+
+    /**
+     * Add compare key.
+     */
+    public void addCompareKey(Object value)
+    {
+      compareKeys.add(value);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/InnerJoinOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/InnerJoinOperator.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/InnerJoinOperator.java
new file mode 100644
index 0000000..d3e11c3
--- /dev/null
+++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/InnerJoinOperator.java
@@ -0,0 +1,212 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.streamquery;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.annotation.OperatorAnnotation;
+import com.datatorrent.lib.streamquery.condition.Condition;
+import com.datatorrent.lib.streamquery.index.Index;
+
+/**
+ * An implementation of Operator that reads table row data from two table data input ports. <br>
+ * <p>
+ * Operator joins row on given condition and selected names, emits
+ * joined result at output port.
+ *  <br>
+ *  <b>StateFull : Yes,</b> Operator aggregates input over application window. <br>
+ *  <b>Partitions : No, </b> will yield wrong result(s). <br>
+ *  <br>
+ *  <b>Ports : </b> <br>
+ *  <b> inport1 : </b> Input port for table 1, expects HashMap&lt;String, Object&gt; <br>
+ *  <b> inport1 : </b> Input port for table 2, expects HashMap&lt;String, Object&gt; <br>
+ *  <b> outport : </b> Output joined row port, emits HashMap&lt;String, ArrayList&lt;Object&gt;&gt; <br>
+ *  <br>
+ *  <b> Properties : </b>
+ *  <b> joinCondition : </b> Join condition for table rows. <br>
+ *  <b> table1Columns : </b> Columns to be selected from table1. <br>
+ *  <b> table2Columns : </b> Columns to be selected from table2. <br>
+ *  <br>
+ * @displayName Inner join
+ * @category Stream Manipulators
+ * @tags sql, inner join operator
+ *
+ * @since 0.3.3
+ * @deprecated
+ */
+@Deprecated
+@OperatorAnnotation(partitionable = false)
+public class InnerJoinOperator implements Operator
+{
+
+  /**
+   * Join Condition;
+   */
+  protected Condition joinCondition;
+
+  /**
+   * Table1 select columns.
+   */
+  private ArrayList<Index> table1Columns = new ArrayList<Index>();
+
+  /**
+   * Table2 select columns.
+   */
+  private ArrayList<Index> table2Columns = new ArrayList<Index>();
+
+  /**
+   * Collect data rows from input port 1.
+   */
+  protected ArrayList<Map<String, Object>> table1;
+
+  /**
+   * Collect data from input port 2.
+   */
+  protected ArrayList<Map<String, Object>> table2;
+
+  /**
+   * Input port 1 that takes a map of &lt;string,object&gt;.
+   */
+  public final transient DefaultInputPort<Map<String, Object>> inport1 = new DefaultInputPort<Map<String, Object>>()
+  {
+    @Override
+    public void process(Map<String, Object> tuple)
+    {
+      table1.add(tuple);
+      for (int j = 0; j < table2.size(); j++) {
+        if ((joinCondition == null) || (joinCondition.isValidJoin(tuple, table2.get(j)))) {
+          joinRows(tuple, table2.get(j));
+        }
+      }
+    }
+  };
+
+  /**
+   * Input port 2 that takes a map of &lt;string,object&gt;.
+   */
+  public final transient DefaultInputPort<Map<String, Object>> inport2 = new DefaultInputPort<Map<String, Object>>()
+  {
+    @Override
+    public void process(Map<String, Object> tuple)
+    {
+      table2.add(tuple);
+      for (int j = 0; j < table1.size(); j++) {
+        if ((joinCondition == null) || (joinCondition.isValidJoin(table1.get(j), tuple))) {
+          joinRows(table1.get(j), tuple);
+        }
+      }
+    }
+  };
+
+  /**
+   * Output port that emits a map of &lt;string,object&gt;.
+   */
+  public final transient DefaultOutputPort<Map<String, Object>> outport =
+      new DefaultOutputPort<Map<String, Object>>();
+
+  @Override
+  public void setup(OperatorContext arg0)
+  {
+    table1 = new ArrayList<Map<String, Object>>();
+    table2 = new ArrayList<Map<String, Object>>();
+  }
+
+  @Override
+  public void teardown()
+  {
+  }
+
+  @Override
+  public void beginWindow(long arg0)
+  {
+  }
+
+  @Override
+  public void endWindow()
+  {
+    table1.clear();
+    table2.clear();
+  }
+
+  /**
+   * @return the joinCondition
+   */
+  public Condition getJoinCondition()
+  {
+    return joinCondition;
+  }
+
+  /**
+   * Pick the supported condition. Currently only equal join is supported.
+   * @param joinCondition joinCondition
+   */
+  public void setJoinCondition(Condition joinCondition)
+  {
+    this.joinCondition = joinCondition;
+  }
+
+  /**
+   * Select table1 column name.
+   */
+  public void selectTable1Column(Index column)
+  {
+    table1Columns.add(column);
+  }
+
+  /**
+   * Select table2 column name.
+   */
+  public void selectTable2Column(Index column)
+  {
+    table2Columns.add(column);
+  }
+
+  /**
+   * Join row from table1 and table2.
+   */
+  protected void joinRows(Map<String, Object> row1, Map<String, Object> row2)
+  {
+    // joined row
+    Map<String, Object> join = new HashMap<String, Object>();
+
+    // filter table1 columns
+    if (row1 != null) {
+      for (Index index: table1Columns) {
+        index.filter(row1, join);
+      }
+    }
+
+    // filter table1 columns
+    if (row2 != null) {
+      for (Index index: table2Columns) {
+        index.filter(row2, join);
+      }
+    }
+
+    // emit row
+    outport.emit(join);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/OrderByOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/OrderByOperator.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/OrderByOperator.java
new file mode 100644
index 0000000..c7a5b25
--- /dev/null
+++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/OrderByOperator.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.streamquery;
+
+import java.util.ArrayList;
+import java.util.Map;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.Operator.Unifier;
+
+/**
+ *  An implementation of Operator that provides sql order by operator semantic over live stream data. <br>
+ * <p>
+ * Input data rows are ordered by order rules, ordered result is emitted on output port. <br>
+ * <br>
+ *  *  <br>
+ *  <b>StateFull : Yes,</b> Operator aggregates input over application window. <br>
+ *  <b>Partitions : Yes, </b> This operator is also unifier on output port. <br>
+ *  <br>
+ * <b>Ports</b>:<br>
+ * <b> inport : </b> Input hash map(row) port, expects HashMap&lt;String,Object&gt;<<br>
+ * <b> outport : </b> Output hash map(row) port, emits  HashMap&lt;String,Object&gt;<br>
+ * <br>
+ * <b> Properties : </b> <br>
+ * <b> orderByRules : </b>List of order by rules for tuples.
+ * @displayName OrderBy
+ * @category Stream Manipulators
+ * @tags orderby operator
+ * @since 0.3.5
+ * @deprecated
+ */
+@Deprecated
+public class OrderByOperator implements Operator, Unifier<Map<String, Object>>
+{
+  /**
+   * Order by rules.
+   */
+  ArrayList<OrderByRule<?>> orderByRules = new ArrayList<OrderByRule<?>>();
+
+  /**
+   * Descending flag.
+   */
+  private boolean isDescending;
+
+  /**
+   * collected rows.
+   */
+  private ArrayList<Map<String, Object>> rows;
+
+  /**
+   * Add order by rule.
+   */
+  public void addOrderByRule(OrderByRule<?> rule)
+  {
+    orderByRules.add(rule);
+  }
+
+  /**
+   * @return isDescending
+   */
+  public boolean isDescending()
+  {
+    return isDescending;
+  }
+
+  /**
+   * @param isDescending isDescending
+   */
+  public void setDescending(boolean isDescending)
+  {
+    this.isDescending = isDescending;
+  }
+
+  @Override
+  public void process(Map<String, Object> tuple)
+  {
+    rows.add(tuple);
+  }
+
+  @Override
+  public void beginWindow(long arg0)
+  {
+    rows = new ArrayList<Map<String, Object>>();
+  }
+
+  @Override
+  public void endWindow()
+  {
+    for (int i = 0; i < orderByRules.size(); i++) {
+      rows = orderByRules.get(i).sort(rows);
+    }
+    if (isDescending) {
+      for (int i = 0; i < rows.size(); i++) {
+        outport.emit(rows.get(i));
+      }
+    } else {
+      for (int i = rows.size() - 1; i >= 0; i--) {
+        outport.emit(rows.get(i));
+      }
+    }
+  }
+
+  @Override
+  public void setup(OperatorContext arg0)
+  {
+    // TODO Auto-generated method stub
+
+  }
+
+  @Override
+  public void teardown()
+  {
+    // TODO Auto-generated method stub
+
+  }
+
+  /**
+   * Input port that takes a map of &lt;string,object&gt;.
+   */
+  public final transient DefaultInputPort<Map<String, Object>> inport = new DefaultInputPort<Map<String, Object>>()
+  {
+    @Override
+    public void process(Map<String, Object> tuple)
+    {
+      rows.add(tuple);
+    }
+  };
+
+  /**
+   * Output port that emits a map of &lt;string,object&gt;.
+   */
+  public final transient DefaultOutputPort<Map<String, Object>> outport = new DefaultOutputPort<Map<String, Object>>()
+  {
+    @Override
+    public Unifier<Map<String, Object>> getUnifier()
+    {
+      OrderByOperator unifier = new OrderByOperator();
+      for (int i = 0; i < getOrderByRules().size(); i++) {
+        unifier.addOrderByRule(getOrderByRules().get(i));
+      }
+      unifier.setDescending(isDescending);
+      return unifier;
+    }
+  };
+
+  /**
+   * @return the orderByRules
+   */
+  public ArrayList<OrderByRule<?>> getOrderByRules()
+  {
+    return orderByRules;
+  }
+
+  /**
+   * The order by rules used to order incoming tuples.
+   * @param oredrByRules the orderByRules to set
+   */
+  public void setOrderByRules(ArrayList<OrderByRule<?>> oredrByRules)
+  {
+    this.orderByRules = oredrByRules;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/OrderByRule.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/OrderByRule.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/OrderByRule.java
new file mode 100644
index 0000000..cc90354
--- /dev/null
+++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/OrderByRule.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.streamquery;
+
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.TreeMap;
+
+/**
+ * Implements order by key name rule. <br>
+ * <p>
+ * <b>Properties : </b> <br>
+ * <b> columnName : </b> Name of column for ordering tuples. <br>
+ * @displayName OrderBy Rule
+ * @category Stream Manipulators
+ * @tags orderby, sort, comparison
+ * @since 0.3.3
+ * @deprecated
+ */
+@Deprecated
+@SuppressWarnings("rawtypes")
+public class OrderByRule<T extends Comparable>
+{
+
+  /**
+   * column name for ordering tuples.
+   */
+  private String columnName;
+
+  public OrderByRule(String name)
+  {
+
+    columnName = name;
+  }
+
+  /**
+   * sort rows by each rule and emit result on output port.
+   */
+  @SuppressWarnings("unchecked")
+  public ArrayList<Map<String, Object>> sort(ArrayList<Map<String, Object>> rows)
+  {
+
+    TreeMap<T, ArrayList<Map<String, Object>>> sorted = new TreeMap<T, ArrayList<Map<String, Object>>>();
+    for (int i = 0; i < rows.size(); i++) {
+      Map<String, Object> row = rows.get(i);
+      if (row.containsKey(columnName)) {
+        T value = (T)row.get(columnName);
+        ArrayList<Map<String, Object>> list;
+        if (sorted.containsKey(value)) {
+          list = sorted.get(value);
+        } else {
+          list = new ArrayList<Map<String, Object>>();
+          sorted.put(value, list);
+        }
+        list.add(row);
+      }
+    }
+    ArrayList<Map<String, Object>> result = new ArrayList<Map<String, Object>>();
+    for (Map.Entry<T, ArrayList<Map<String, Object>>> entry : sorted.entrySet()) {
+      result.addAll(entry.getValue());
+    }
+    return result;
+  }
+
+  /**
+   * @return the columnName
+   */
+  public String getColumnName()
+  {
+
+    return columnName;
+  }
+
+  /**
+   * @param columnName
+   *          the columnName to set
+   */
+  public void setColumnName(String columnName)
+  {
+
+    this.columnName = columnName;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/OuterJoinOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/OuterJoinOperator.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/OuterJoinOperator.java
new file mode 100644
index 0000000..9a8fde8
--- /dev/null
+++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/OuterJoinOperator.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.streamquery;
+
+/**
+ * An operator that provides sql left,right and full outer join metric semantics on live stream. <br>
+ * <p>
+ * Please refer to {@link org.apache.apex.malhar.lib.misc.streamquery.InnerJoinOperator} for
+ * details.
+ *
+ * <b> Properties : </b> <br>
+ * <b> isLeftJoin : </b> Left join flag. <br>
+ * <b> isFullJoin : </b> Full join flag. <br>
+ * @displayName Outer Join
+ * @category Stream Manipulators
+ * @tags sql, outer join operator
+ * @since 0.3.4
+ * @deprecated
+ */
+@Deprecated
+public class OuterJoinOperator extends InnerJoinOperator
+{
+
+  private boolean isLeftJoin = true;
+  private boolean isFullJoin = false;
+
+  @Override
+  public void endWindow()
+  {
+    // full outer join
+    if (isFullJoin) {
+      for (int i = 0; i < table1.size(); i++) {
+        boolean merged = false;
+        for (int j = 0; j < table2.size(); j++) {
+          if ((joinCondition == null)
+              || (joinCondition.isValidJoin(table1.get(i), table2.get(j)))) {
+            merged = true;
+          }
+        }
+        if (!merged) {
+          joinRows(table1.get(i), null);
+        }
+      }
+      for (int i = 0; i < table2.size(); i++) {
+        boolean merged = false;
+        for (int j = 0; j < table1.size(); j++) {
+          if ((joinCondition == null)
+              || (joinCondition.isValidJoin(table1.get(j), table2.get(i)))) {
+            merged = true;
+          }
+        }
+        if (!merged) { // only output non merged rows
+          joinRows(null, table2.get(i));
+        }
+      }
+      return;
+    }
+
+    // left or right join
+    if (isLeftJoin) {
+      for (int i = 0; i < table1.size(); i++) {
+        boolean merged = false;
+        for (int j = 0; j < table2.size(); j++) {
+          if ((joinCondition == null)
+              || (joinCondition.isValidJoin(table1.get(i), table2.get(j)))) {
+            merged = true;
+          }
+        }
+        if (!merged) {
+          joinRows(table1.get(i), null);
+        }
+      }
+    } else {
+      for (int i = 0; i < table2.size(); i++) {
+        boolean merged = false;
+        for (int j = 0; j < table1.size(); j++) {
+          if ((joinCondition == null) || (joinCondition.isValidJoin(table1.get(j), table2.get(i)))) {
+            merged = true;
+          }
+        }
+        if (!merged) { // only output non merged rows
+          joinRows(null, table2.get(i));
+        }
+      }
+    }
+  }
+
+  public void setLeftJoin()
+  {
+    isLeftJoin = true;
+  }
+
+  public void setRighttJoin()
+  {
+    isLeftJoin = false;
+  }
+
+  public boolean isFullJoin()
+  {
+    return isFullJoin;
+  }
+
+  public void setFullJoin(boolean isFullJoin)
+  {
+    this.isFullJoin = isFullJoin;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/SelectFunctionOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/SelectFunctionOperator.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/SelectFunctionOperator.java
new file mode 100644
index 0000000..6e8008b
--- /dev/null
+++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/SelectFunctionOperator.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.streamquery;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.apex.malhar.contrib.misc.streamquery.function.FunctionIndex;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.annotation.OperatorAnnotation;
+
+/**
+ *  An implementation of Operator that applies sql top or limit semantics on incoming tuple(s). <br>
+ * <p>
+ * <b>StateFull : Yes,</b> Operator aggregates input over application window. <br>
+ * <b>Partitions : No, </b> will yield wrong result(s). <br>
+ * <br>
+ * <b>Ports : </b> <br>
+ * <b>inport : </b> expect tuple for type T. <br>
+ * <b>outport : </b> emits tuple for type T. <br>
+ * <br>
+ * <b> Properties : </b> <br>
+ * <b> functions : </b> Sql function for rows. <br>
+ * @displayName Select Function
+ * @category Stream Manipulators
+ * @tags sql top, sql limit, sql select operator
+ * @since 0.3.4
+ * @deprecated
+ */
+@Deprecated
+@OperatorAnnotation(partitionable = false)
+public class SelectFunctionOperator implements Operator
+{
+  /**
+   * array of rows.
+   */
+  private ArrayList<Map<String, Object>> rows;
+
+  /**
+   * Aggregate function for rows.
+   */
+  private ArrayList<FunctionIndex> functions = new ArrayList<FunctionIndex>();
+
+  /**
+   * Input port that takes a map of &lt;string,object&gt;.
+   */
+  public final transient DefaultInputPort<Map<String, Object>> inport = new DefaultInputPort<Map<String, Object>>()
+  {
+
+    @Override
+    public void process(Map<String, Object> row)
+    {
+      rows.add(row);
+    }
+  };
+
+  @Override
+  public void setup(OperatorContext context)
+  {
+    // TODO Auto-generated method stub
+
+  }
+
+  @Override
+  public void teardown()
+  {
+    // TODO Auto-generated method stub
+
+  }
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+    rows = new ArrayList<Map<String, Object>>();
+  }
+
+  @Override
+  public void endWindow()
+  {
+    if (functions.size() == 0) {
+      return;
+    }
+    Map<String, Object>  collect = new HashMap<String, Object>();
+    for (FunctionIndex function : functions) {
+      try {
+        function.filter(rows, collect);
+      } catch (Exception e) {
+        e.printStackTrace();
+        return;
+      }
+    }
+    outport.emit(collect);
+  }
+
+  /**
+   * Output port that emits a map of &lt;string,object&gt;.
+   */
+  public final transient DefaultOutputPort<Map<String, Object>> outport = new DefaultOutputPort<Map<String, Object>>();
+
+  /**
+   * Add sql function.
+   * @param function  Sql function for rows.
+   */
+  public void addSqlFunction(FunctionIndex function)
+  {
+    functions.add(function);
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/SelectOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/SelectOperator.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/SelectOperator.java
new file mode 100644
index 0000000..08799cb
--- /dev/null
+++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/SelectOperator.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.streamquery;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.streamquery.condition.Condition;
+import com.datatorrent.lib.streamquery.index.Index;
+
+/**
+ * An implementation of that provides sql select query semantics on live data stream. <br>
+ * <p>
+ * Stream rows passing condition are emitted on output port stream. <br>
+ * <br>
+ * <b>StateFull : NO,</b> all row data is processed in current time window. <br>
+ * <b>Partitions : Yes, </b> No Input dependency among input rows. <br>
+ * <br>
+ * <b>Ports</b>:<br>
+ * <b> inport : </b> Input hash map(row) port, expects
+ * HashMap&lt;String,Object&gt;<<br>
+ * <b> outport : </b> Output hash map(row) port, emits
+ * HashMap&lt;String,Object&gt;<br>
+ * <br>
+ * <b> Properties : <b> <br>
+ * <b> condition : </b> Select condition for selecting rows. <br>
+ * <b> columns : </b> Column names/aggregate functions for select. <br>
+ * <br>
+ * @displayName Select
+ * @category Stream Manipulators
+ * @tags sql select operator, index, sql condition
+ * @since 0.3.3
+ * @deprecated
+ */
+@Deprecated
+public class SelectOperator extends BaseOperator
+{
+
+  /**
+   * select columns/expression;
+   */
+  private ArrayList<Index> indexes = new ArrayList<Index>();
+
+  /**
+   * condition.
+   */
+  private Condition condition = null;
+
+  /**
+   * add index.
+   */
+  public void addIndex(Index index)
+  {
+    indexes.add(index);
+  }
+
+  /**
+   * set condition.
+   */
+  public void setCondition(Condition condition)
+  {
+    this.condition = condition;
+  }
+
+  /**
+   * Input port that takes a map of &lt;string,object&gt;.
+   */
+  public final transient DefaultInputPort<Map<String, Object>> inport = new DefaultInputPort<Map<String, Object>>()
+  {
+
+    @Override
+    public void process(Map<String, Object> tuple)
+    {
+      if ((condition != null) && (!condition.isValidRow(tuple))) {
+        return;
+      }
+      if (indexes.size() == 0) {
+        outport.emit(tuple);
+        return;
+      }
+      Map<String, Object> result = new HashMap<String, Object>();
+      for (int i = 0; i < indexes.size(); i++) {
+        indexes.get(i).filter(tuple, result);
+      }
+      outport.emit(result);
+    }
+  };
+
+  /**
+   * Output port that emits a map of &lt;string,object&gt;.
+   */
+  public final transient DefaultOutputPort<Map<String, Object>> outport = new DefaultOutputPort<Map<String, Object>>();
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/SelectTopOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/SelectTopOperator.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/SelectTopOperator.java
new file mode 100644
index 0000000..2880198
--- /dev/null
+++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/SelectTopOperator.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.streamquery;
+
+import java.util.ArrayList;
+import java.util.Map;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+
+/**
+ * An implementation of Operator that provides sql top select query semantic on live data stream. <br>
+ * <p>
+ * Stream rows passing condition are emitted on output port stream. <br>
+ * <br>
+ * <b>StateFull : NO,</b> all row data is processed in current time window. <br>
+ * <b>Partitions : Yes, </b> No Input dependency among input rows. <br>
+ * <br>
+ * <b>Ports</b>:<br>
+ * <b> inport : </b> Input hash map(row) port, expects
+ * HashMap&lt;String,Object&gt;<<br>
+ * <b> outport : </b> Output hash map(row) port, emits
+ * HashMap&lt;String,Object&gt;<br>
+ * <br>
+ * <b> Properties : <b> <br>
+ * <b> topValue : </b> top values count. <br>
+ * <b> isPercentage : </b> top values count is percentage flag.
+ * <br>
+ * @displayName Select Top
+ * @category Stream Manipulators
+ * @tags sql select, sql top operator
+ *  @since 0.3.4
+ *  @deprecated
+ */
+@Deprecated
+public class SelectTopOperator implements Operator
+{
+  private ArrayList<Map<String, Object>> list;
+  private int topValue = 1;
+  private boolean isPercentage = false;
+
+  /**
+   * Input port that takes a map of &lt;string,object&gt;.
+   */
+  public final transient DefaultInputPort<Map<String, Object>> inport = new DefaultInputPort<Map<String, Object>>()
+  {
+    @Override
+    public void process(Map<String, Object> tuple)
+    {
+      list.add(tuple);
+    }
+  };
+
+  @Override
+  public void setup(OperatorContext context)
+  {
+    // TODO Auto-generated method stub
+
+  }
+
+  @Override
+  public void teardown()
+  {
+    // TODO Auto-generated method stub
+
+  }
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+    list = new ArrayList<Map<String, Object>>();
+  }
+
+  @Override
+  public void endWindow()
+  {
+    int numEmits = topValue;
+    if (isPercentage) {
+      numEmits = list.size() * (topValue / 100);
+    }
+    for (int i = 0; (i < numEmits) && (i < list.size()); i++) {
+      outport.emit(list.get(i));
+    }
+  }
+
+  public int getTopValue()
+  {
+    return topValue;
+  }
+
+  public void setTopValue(int topValue) throws Exception
+  {
+    if (topValue <= 0) {
+      throw new Exception("Top value must be positive number.");
+    }
+    this.topValue = topValue;
+  }
+
+  public boolean isPercentage()
+  {
+    return isPercentage;
+  }
+
+  public void setPercentage(boolean isPercentage)
+  {
+    this.isPercentage = isPercentage;
+  }
+
+  /**
+   * Output port that emits a map of &lt;string,object&gt;.
+   */
+  public final transient DefaultOutputPort<Map<String, Object>> outport =  new DefaultOutputPort<Map<String, Object>>();
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/UpdateOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/UpdateOperator.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/UpdateOperator.java
new file mode 100644
index 0000000..52ddac6
--- /dev/null
+++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/UpdateOperator.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.streamquery;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.streamquery.condition.Condition;
+
+/**
+ *  An implementation of BaseOperator that provides sql update query semantic on live data stream. <br>
+ *  <p>
+ *  Stream rows passing condition are emitted on output port stream. <br>
+ *  <br>
+ *  <b>StateFull : NO,</b> all row data is processed in current time window. <br>
+ *  <b>Partitions : Yes, </b> No Input dependency among input rows. <br>
+ *  <br>
+ * <b>Ports</b>:<br>
+ * <b> inport : </b> Input hash map(row) port, expects HashMap&lt;String,Object&gt;<<br>
+ * <b> outport : </b> Output hash map(row) port, emits  HashMap&lt;String,Object&gt;<br>
+ * <br>
+ * <b> Properties : <b> <br>
+ * <b> condition : </b> Select condition for selecting rows. <br>
+ * <b> columns : </b> Column names/aggregate functions for select. <br>
+ * <br>
+ * @displayName Update
+ * @category Stream Manipulators
+ * @tags sql update operator, sql condition
+ * @since 0.3.3
+ * @deprecated
+ */
+@Deprecated
+public class UpdateOperator extends BaseOperator
+{
+  /**
+   * Update value map.
+   */
+  Map<String, Object> updates = new HashMap<String, Object>();
+
+  /**
+   *  condition.
+   */
+  private Condition condition = null;
+
+  /**
+   * set condition.
+   */
+  public void setCondition(Condition condition)
+  {
+    this.condition = condition;
+  }
+
+  /**
+   * Input port that takes a map of &lt;string,object&gt;.
+   */
+  public final transient DefaultInputPort<Map<String, Object>> inport = new DefaultInputPort<Map<String, Object>>()
+  {
+    @Override
+    public void process(Map<String, Object> tuple)
+    {
+      if ((condition != null) && (!condition.isValidRow(tuple))) {
+        return;
+      }
+      if (updates.size() == 0) {
+        outport.emit(tuple);
+        return;
+      }
+      Map<String, Object> result = new HashMap<String, Object>();
+      for (Map.Entry<String, Object> entry : tuple.entrySet()) {
+        if (updates.containsKey(entry.getKey())) {
+          result.put(entry.getKey(), updates.get(entry.getKey()));
+        } else {
+          result.put(entry.getKey(), entry.getValue());
+        }
+      }
+      outport.emit(result);
+    }
+  };
+
+  /**
+   * Output port that emits a map of &lt;string,object&gt;.
+   */
+  public final transient DefaultOutputPort<Map<String, Object>> outport =  new DefaultOutputPort<Map<String, Object>>();
+
+  /**
+   * Add update value.
+   */
+  public void addUpdate(String name, Object value)
+  {
+    updates.put(name, value);
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/condition/BetweenCondition.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/condition/BetweenCondition.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/condition/BetweenCondition.java
new file mode 100644
index 0000000..155470c
--- /dev/null
+++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/condition/BetweenCondition.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.streamquery.condition;
+
+import java.util.Map;
+
+import javax.validation.constraints.NotNull;
+
+import com.datatorrent.lib.streamquery.condition.Condition;
+
+/**
+ *  A derivation of Condition that validates row by checking if the given column name value lies between given left,right range. <br>
+ * <p>
+ * <b>Properties : </b> <br>
+ * <b> column : </b> Name of column. <br>
+ * <b> leftValue : </b> left range of column value. <br>
+ * <b> rightValue : </b> right range od column value. <br>
+ * <br>
+ * @displayName Between Condition
+ * @category Stream Manipulators
+ * @tags sql condition
+ * @since 0.3.4
+ * @deprecated
+ */
+@Deprecated
+public class BetweenCondition extends Condition
+{
+  /**
+   * Column name to be checked.
+   */
+  @NotNull
+  private String column;
+
+  /**
+   * Left range value.
+   */
+  @NotNull
+  private Object leftValue;
+
+  /**
+   * Right range value.
+   */
+  @NotNull
+  private Object rightValue;
+
+  /**
+   * @param  column  Name of column, must be non null. <br>
+   * @param  leftValue  Left range for value, mut be non null. <br>
+   * @param  rightValue  right range for value, mut be non null. <br>
+   */
+  public BetweenCondition(@NotNull String column, @NotNull  Object leftValue, @NotNull Object rightValue)
+  {
+    this.column = column;
+    this.leftValue = leftValue;
+    this.rightValue = rightValue;
+  }
+
+  /**
+   * Validate given row.
+   */
+  @SuppressWarnings({"rawtypes", "unchecked"})
+  @Override
+  public boolean isValidRow(@NotNull Map<String, Object> row)
+  {
+    if (!row.containsKey(column)) {
+      return false;
+    }
+    Object value = row.get(column);
+    if (value == null) {
+      return false;
+    }
+    if (((Comparable)value).compareTo((Comparable)leftValue) < 0) {
+      return false;
+    }
+    if (((Comparable)value).compareTo((Comparable)rightValue) > 0) {
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * Must not be called.
+   */
+  @Override
+  public boolean isValidJoin(@NotNull Map<String, Object> row1, Map<String, Object> row2)
+  {
+    assert (false);
+    return false;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/condition/CompoundCondition.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/condition/CompoundCondition.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/condition/CompoundCondition.java
new file mode 100644
index 0000000..d606991
--- /dev/null
+++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/condition/CompoundCondition.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.streamquery.condition;
+
+import java.util.Map;
+
+import javax.validation.constraints.NotNull;
+
+import com.datatorrent.lib.streamquery.condition.Condition;
+
+/**
+ * A derivation of Condition index that implements logical AND/OR select expression. <br>
+ * <p>
+ * Class provides logical OR or AND function specified in parameters. User can implement
+ * complex and/or expression by chaining operator itself.
+ * <br>
+ * <b> Properties : </b> <br>
+ * <b> leftCondition : </b> Left validate row condition . <br>
+ * <b> rightCondition : </b> Right validate row condition. <br>
+ * <b> logicalOr : </b> OR/AND logical metric flag. <br>
+ * <br>
+ * @displayName Compound Condition
+ * @category Stream Manipulators
+ * @tags sql condition, logical
+ * @since 0.3.4
+ * @deprecated
+ */
+@Deprecated
+public class CompoundCondition extends Condition
+{
+  /**
+   * Left validate row condition .
+   */
+  @NotNull
+  private Condition leftCondition;
+
+  /**
+   * Right validate row condition .
+   */
+  @NotNull
+  private Condition rightCondition;
+
+  /**
+   * AND/OR metric flag.
+   */
+  private boolean logicalOr = true;
+
+  /**
+   * Constructor for logical or metric.
+   *
+   * @param leftCondition  Left validate row condition, must be non null. <br>
+   * @param rightCondition Right validate row condition, must be non null. <br>
+   */
+  public CompoundCondition(Condition leftCondition, Condition rightCondition)
+  {
+    this.leftCondition = leftCondition;
+    this.rightCondition = rightCondition;
+  }
+
+  /**
+   * Constructor for logical and metric if logical and parameter is true.
+   * <br>
+   *
+   * @param leftCondition  Left validate row condition, must be non null. <br>
+   * @param rightCondition Right validate row condition, must be non null. <br>
+   * @param isLogicalAnd   Logical AND if true.
+   */
+  public CompoundCondition(Condition leftCondition, Condition rightCondition, boolean isLogicalAnd)
+  {
+    this.leftCondition = leftCondition;
+    this.rightCondition = rightCondition;
+    logicalOr = !isLogicalAnd;
+  }
+
+  @Override
+  public boolean isValidRow(Map<String, Object> row)
+  {
+    if (logicalOr) {
+      return leftCondition.isValidRow(row) || rightCondition.isValidRow(row);
+    } else {
+      return leftCondition.isValidRow(row) && rightCondition.isValidRow(row);
+    }
+  }
+
+  @Override
+  public boolean isValidJoin(Map<String, Object> row1, Map<String, Object> row2)
+  {
+    // TODO Auto-generated method stub
+    return false;
+  }
+
+  public Condition getLeftCondition()
+  {
+    return leftCondition;
+  }
+
+  public void setLeftCondition(Condition leftCondition)
+  {
+    this.leftCondition = leftCondition;
+  }
+
+  public Condition getRightCondition()
+  {
+    return rightCondition;
+  }
+
+  public void setRightCondition(Condition rightCondition)
+  {
+    this.rightCondition = rightCondition;
+  }
+
+  public void setLogicalAnd()
+  {
+    this.logicalOr = false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/condition/EqualValueCondition.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/condition/EqualValueCondition.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/condition/EqualValueCondition.java
new file mode 100644
index 0000000..a54960d
--- /dev/null
+++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/condition/EqualValueCondition.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.streamquery.condition;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import com.datatorrent.lib.streamquery.condition.Condition;
+
+/**
+ * An implementation of condition on column equality.
+ * <p>
+ * A valid row must have all key/value map in column name/value map.
+ *
+ * <b> Properties : </b> <br>
+ *  <b> equalMap : </b> Column equal value map store.
+ * @displayName Equal Value Condition
+ * @category Stream Manipulators
+ * @tags sql condition
+ * @since 0.3.4
+ * @deprecated
+ */
+@Deprecated
+public class EqualValueCondition extends Condition
+{
+
+  /**
+   * Equal column map.
+   */
+  private HashMap<String, Object> equalMap = new HashMap<String, Object>();
+
+  /**
+   * Add column equal condition.
+   */
+  public void addEqualValue(String column, Object value)
+  {
+    equalMap.put(column, value);
+  }
+
+  /**
+   * Check valid row.
+   */
+  @Override
+  public boolean isValidRow(Map<String, Object> row)
+  {
+    // no conditions
+    if (equalMap.size() == 0) {
+      return true;
+    }
+
+    // compare each condition value
+    for (Map.Entry<String, Object> entry : equalMap.entrySet()) {
+      if (!row.containsKey(entry.getKey())) {
+        return false;
+      }
+      Object value = row.get(entry.getKey());
+      if (entry.getValue() == null) {
+        if (value == null) {
+          return true;
+        }
+        return false;
+      }
+      if (value == null) {
+        return false;
+      }
+      if (!entry.getValue().equals(value)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   * check valid join, not implemented
+   *
+   * @return false
+   */
+  @Override
+  public boolean isValidJoin(Map<String, Object> row1, Map<String, Object> row2)
+  {
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/condition/HavingCompareValue.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/condition/HavingCompareValue.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/condition/HavingCompareValue.java
new file mode 100644
index 0000000..6e0400b
--- /dev/null
+++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/condition/HavingCompareValue.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.streamquery.condition;
+
+import java.util.ArrayList;
+import java.util.Map;
+
+import javax.validation.constraints.NotNull;
+
+import org.apache.apex.malhar.contrib.misc.streamquery.function.FunctionIndex;
+
+/**
+ *  A derivation of HavingCondition that implements comparison of aggregate index value to input compare value. <br>
+ * <p>
+ * Compare value must implement interface Comparable. <br>
+ * <br>
+ * <b> Properties : </b>
+ *  <b> compareValue : </b>  Value to be compared. <br>
+ *  <b>  compareType : </b> Type of comparison -1 == lt, 0 == eq, 1 == gt. <br>
+ * @displayName Having Compare Value
+ * @category Stream Manipulators
+ * @tags compare, sql condition
+ * @since 0.3.4
+ * @deprecated
+ */
+@Deprecated
+@SuppressWarnings("rawtypes")
+public class HavingCompareValue<T extends Comparable> extends HavingCondition
+{
+  /**
+   * Value to be compared.
+   */
+  private T compareValue;
+
+  /**
+   * Type of comparison -1 == lt, 0 == eq, 1 == gt.
+   */
+  private int compareType;
+
+  /**
+   * @param aggregateIndex   aggregate index for comparison. <br>
+   * @param compareValue     Value to be compared. <br>
+   * @param compareType    Type of comparison -1 == lt, 0 == eq, 1 == gt. <br>
+   */
+  public HavingCompareValue(FunctionIndex aggregateIndex, T compareValue, int compareType)
+  {
+    super(aggregateIndex);
+    this.compareValue = compareValue;
+    this.compareType  = compareType;
+  }
+
+  /**
+   * Validate aggregate override. <br>
+   */
+  @SuppressWarnings("unchecked")
+  @Override
+  public boolean isValidAggregate(@NotNull ArrayList<Map<String, Object>> rows) throws Exception
+  {
+    Object computed = aggregateIndex.compute(rows);
+    return (compareType == compareValue.compareTo(computed));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/condition/HavingCondition.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/condition/HavingCondition.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/condition/HavingCondition.java
new file mode 100644
index 0000000..66ef35c
--- /dev/null
+++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/condition/HavingCondition.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.streamquery.condition;
+
+import java.util.ArrayList;
+import java.util.Map;
+
+import javax.validation.constraints.NotNull;
+
+import org.apache.apex.malhar.contrib.misc.streamquery.function.FunctionIndex;
+
+/**
+ *  A base class for Group,Having operator with aggregate index constraint.&nsbsp; Subclasses should provide the
+    implementation to check if aggregate is valid.
+ * <p>
+ * @displayName Having Condition
+ * @category Stream Manipulators
+ * @tags sql condition, index, group
+ * @since 0.3.4
+ * @deprecated
+ */
+@Deprecated
+public abstract class HavingCondition
+{
+  /**
+   * Aggregate index to be validated.
+   */
+  protected FunctionIndex aggregateIndex = null;
+
+  /**
+   * @param aggregateIndex Aggregate index to be validated.
+   */
+  public HavingCondition(FunctionIndex aggregateIndex)
+  {
+    this.aggregateIndex = aggregateIndex;
+  }
+
+  /**
+   *  Check if aggregate is valid.
+   */
+  public abstract boolean isValidAggregate(@NotNull ArrayList<Map<String, Object>> rows) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/condition/InCondition.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/condition/InCondition.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/condition/InCondition.java
new file mode 100644
index 0000000..d19bb99
--- /dev/null
+++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/condition/InCondition.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.streamquery.condition;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import javax.validation.constraints.NotNull;
+
+import com.datatorrent.lib.streamquery.condition.Condition;
+
+/**
+ * An implementation of condition class to check if a column value is in a given set of values.
+ * <p>
+ * <br>
+ * <b>Properties : </b> <br>
+ * <b> column : </b> Column name for which value is checked in values set. <br>
+ * <b> inValues : </b> Set of values in which column value is checked. <br>
+ * @displayName In Condition
+ * @category Stream Manipulators
+ * @tags sql condition
+ * @since 0.3.4
+ * @deprecated
+ */
+@Deprecated
+public class InCondition extends Condition
+{
+  /**
+   * Column name for which value is checked in values set.
+   */
+  @NotNull
+  private String column;
+
+  /**
+   * Set of values in which column value is checked.
+   */
+  private Set<Object> inValues = new HashSet<Object>();
+
+  /**
+   * @param column Column name for which value is checked in values set.
+   */
+  public InCondition(@NotNull String column)
+  {
+    this.column = column;
+  }
+
+  @Override
+  public boolean isValidRow(@NotNull Map<String, Object> row)
+  {
+    if (!row.containsKey(column)) {
+      return false;
+    }
+    return inValues.contains(row.get(column));
+  }
+
+  @Override
+  public boolean isValidJoin(@NotNull Map<String, Object> row1, @NotNull Map<String, Object> row2)
+  {
+    return false;
+  }
+
+  public String getColumn()
+  {
+    return column;
+  }
+
+  public void setColumn(String column)
+  {
+    this.column = column;
+  }
+
+  public void addInValue(Object value)
+  {
+    this.inValues.add(value);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/condition/LikeCondition.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/condition/LikeCondition.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/condition/LikeCondition.java
new file mode 100644
index 0000000..a8789fa
--- /dev/null
+++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/condition/LikeCondition.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.streamquery.condition;
+
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import javax.validation.constraints.NotNull;
+
+import com.datatorrent.lib.streamquery.condition.Condition;
+
+/**
+ * An implementation of condition class to filter rows for which given column name value matches given regular expression. <br>
+ *<p>
+ *<b> Properties : </b> <br>
+ *<b> column : < /b> Column to be matched with regular expression. <br>
+ *<b> pattern : </b> Regular expression pattern.<br>
+ * @displayName Like Condition
+ * @category Stream Manipulators
+ * @tags sql, like condition, regular expression
+ * @since 0.3.4
+ * @deprecated
+ */
+@Deprecated
+public class LikeCondition extends Condition
+{
+  /**
+   * Column to be matched with regular expression.
+   */
+  @NotNull
+  private String column;
+
+  /**
+   * Regular expression pattern.
+   */
+  @NotNull
+  private Pattern pattern;
+
+  /**
+   * @param column Column to be matched with regular expression, must be non-null.
+   * @param pattern Regular expression pattern, must be non-null.
+   */
+  public LikeCondition(@NotNull String column,@NotNull String pattern)
+  {
+    setColumn(column);
+    setPattern(pattern);
+  }
+
+  /**
+   * For valid row column value string must match regular expression.
+   * @return row valid status.
+   */
+  @Override
+  public boolean isValidRow(Map<String, Object> row)
+  {
+    if (!row.containsKey(column)) {
+      return false;
+    }
+    Matcher match = pattern.matcher((CharSequence)row.get(column));
+    return match.find();
+  }
+
+  /**
+   * Must not be called.
+   */
+  @Override
+  public boolean isValidJoin(Map<String, Object> row1, Map<String, Object> row2)
+  {
+    assert (false);
+    return false;
+  }
+
+  public String getColumn()
+  {
+    return column;
+  }
+
+  public void setColumn(String column)
+  {
+    this.column = column;
+  }
+
+  public void setPattern(String pattern)
+  {
+    this.pattern = Pattern.compile(pattern);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/function/AverageFunction.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/function/AverageFunction.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/function/AverageFunction.java
new file mode 100644
index 0000000..72ac59f
--- /dev/null
+++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/function/AverageFunction.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.streamquery.function;
+
+import java.util.ArrayList;
+import java.util.Map;
+
+import javax.validation.constraints.NotNull;
+
+import org.apache.commons.lang.StringUtils;
+
+/**
+ * An implementation of function index that implements average function semantics. <br>
+ * <p>
+ *   e.g : sql => SELECT AVG(column_name) FROM table_name. <br>
+ *   <br>
+ *   <b> Properties : </b> <br>
+ *   <b> column : </b> Aggregate over given column values.   <br>
+ *   <b> alias  : </b> Alias name for aggregate output. <br>
+ * @displayName Average Function
+ * @category Stream Manipulators
+ * @tags sql average
+ * @since 0.3.4
+ * @deprecated
+ */
+@Deprecated
+public class AverageFunction extends FunctionIndex
+{
+  /**
+   * @param column Aggregate over given column values, must be non null.
+   * @param alias  Alias name for aggregate output.
+   */
+  public AverageFunction(@NotNull String column, String alias)
+  {
+    super(column, alias);
+  }
+
+  /**
+   * Compute average for given column values.
+   */
+  @Override
+  public Object compute(@NotNull ArrayList<Map<String, Object>> rows) throws Exception
+  {
+    if (rows.size() == 0) {
+      return 0.0;
+    }
+    double sum = 0.0;
+    for (Map<String, Object> row : rows) {
+      sum += ((Number)row.get(column)).doubleValue();
+    }
+    return sum / rows.size();
+  }
+
+  /**
+   * Get aggregate name.
+   * @return name.
+   */
+  @Override
+  protected String aggregateName()
+  {
+    if (!StringUtils.isEmpty(alias)) {
+      return alias;
+    }
+    return "AVG(" + column + ")";
+  }
+}