You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/09/01 18:30:54 UTC
[10/12] apex-malhar git commit: Updated algo & working on math
operators
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/AbstractSqlStreamOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/AbstractSqlStreamOperator.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/AbstractSqlStreamOperator.java
new file mode 100644
index 0000000..16f1036
--- /dev/null
+++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/AbstractSqlStreamOperator.java
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.streamquery;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+
+/**
+ * A base implementation of a BaseOperator that is a sql stream operator. Subclasses should provide the
+ implementation of how to process the tuples.
+ * <p>
+ * Abstract sql db input operator.
+ * <p>
+ * @displayName Abstract Sql Stream
+ * @category Stream Manipulators
+ * @tags sql operator
+ * @since 0.3.2
+ * @deprecated
+ */
+@Deprecated
+public abstract class AbstractSqlStreamOperator extends BaseOperator
+{
+ public static class InputSchema
+ {
+ public static class ColumnInfo
+ {
+ public String type;
+ public int bindIndex = 0;
+ public boolean isColumnIndex = false;
+ }
+
+ /**
+ * the name of the input "table"
+ */
+ public String name;
+ /**
+ * key is the name of the column, and value is the SQL type
+ */
+ public HashMap<String, ColumnInfo> columnInfoMap = new HashMap<String, ColumnInfo>();
+
+ public InputSchema()
+ {
+ }
+
+ public InputSchema(String name)
+ {
+ this.name = name;
+ }
+
+ public void setColumnInfo(String columnName, String columnType, boolean isColumnIndex)
+ {
+ ColumnInfo t = new ColumnInfo();
+ t.type = columnType;
+ t.isColumnIndex = isColumnIndex;
+ columnInfoMap.put(columnName, t);
+ }
+
+ }
+
+ protected String statement;
+ protected ArrayList<InputSchema> inputSchemas = new ArrayList<InputSchema>(5);
+ protected transient ArrayList<Object> bindings;
+
+ /**
+ * Input bindings port that takes an arraylist of objects.
+ */
+ @InputPortFieldAnnotation(optional = true)
+ public final transient DefaultInputPort<ArrayList<Object>> bindingsPort = new DefaultInputPort<ArrayList<Object>>()
+ {
+ @Override
+ public void process(ArrayList<Object> tuple)
+ {
+ bindings = tuple;
+ }
+
+ };
+
+ /**
+ * Input port in1 that takes a hashmap of <string,object>.
+ */
+ public final transient DefaultInputPort<HashMap<String, Object>> in1 = new DefaultInputPort<HashMap<String, Object>>()
+ {
+ @Override
+ public void process(HashMap<String, Object> tuple)
+ {
+ processTuple(0, tuple);
+ }
+
+ };
+
+ /**
+ * Input port in2 that takes a hashmap of <string,object>.
+ */
+ @InputPortFieldAnnotation(optional = true)
+ public final transient DefaultInputPort<HashMap<String, Object>> in2 = new DefaultInputPort<HashMap<String, Object>>()
+ {
+ @Override
+ public void process(HashMap<String, Object> tuple)
+ {
+ processTuple(1, tuple);
+ }
+
+ };
+
+ /**
+ * Input port in3 that takes a hashmap of <string,object>.
+ */
+ @InputPortFieldAnnotation(optional = true)
+ public final transient DefaultInputPort<HashMap<String, Object>> in3 = new DefaultInputPort<HashMap<String, Object>>()
+ {
+ @Override
+ public void process(HashMap<String, Object> tuple)
+ {
+ processTuple(2, tuple);
+ }
+
+ };
+
+ /**
+ * Input port in4 that takes a hashmap of <string,object>.
+ */
+ @InputPortFieldAnnotation(optional = true)
+ public final transient DefaultInputPort<HashMap<String, Object>> in4 = new DefaultInputPort<HashMap<String, Object>>()
+ {
+ @Override
+ public void process(HashMap<String, Object> tuple)
+ {
+ processTuple(3, tuple);
+ }
+
+ };
+
+ /**
+ * Input port in5 that takes a hashmap of <string,object>.
+ */
+ @InputPortFieldAnnotation(optional = true)
+ public final transient DefaultInputPort<HashMap<String, Object>> in5 = new DefaultInputPort<HashMap<String, Object>>()
+ {
+ @Override
+ public void process(HashMap<String, Object> tuple)
+ {
+ processTuple(4, tuple);
+ }
+
+ };
+
+ /**
+ * Output result port that emits a hashmap of <string,object>.
+ */
+ @OutputPortFieldAnnotation(optional = true)
+ public final transient DefaultOutputPort<HashMap<String, Object>> result = new DefaultOutputPort<HashMap<String, Object>>();
+
+ public void setStatement(String statement)
+ {
+ this.statement = statement;
+ }
+
+ public String getStatement()
+ {
+ return this.statement;
+ }
+
+ public void setInputSchema(int inputPortIndex, InputSchema inputSchema)
+ {
+ inputSchemas.add(inputPortIndex, inputSchema);
+ }
+
+ public abstract void processTuple(int tableNum, HashMap<String, Object> tuple);
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/DeleteOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/DeleteOperator.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/DeleteOperator.java
new file mode 100644
index 0000000..7faf96d
--- /dev/null
+++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/DeleteOperator.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.streamquery;
+
+import java.util.Map;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.streamquery.condition.Condition;
+
+/**
+ * An implementation of BaseOperator that provides sql delete query semantic on live data stream. <br>
+ * <p>
+ * Stream rows passing condition are emitted on output port stream. <br>
+ * <br>
+ * <b>StateFull : NO,</b> all row data is processed in current time window. <br>
+ * <b>Partitions : Yes, </b> No Input dependency among input rows. <br>
+ * <br>
+ * <b>Ports</b>:<br>
+ * <b> inport : </b> Input hash map(row) port, expects
+ * HashMap<String,Object><<br>
+ * <b> outport : </b> Output hash map(row) port, emits
+ * HashMap<String,Object><br>
+ * <br>
+ * <b> Properties : <b> <br>
+ * <b> condition : </b> Select condition for selecting rows. <br>
+ * <b> columns : </b> Column names/aggregate functions for select. <br>
+ * <br>
+ * @displayName Delete
+ * @category Stream Manipulators
+ * @tags sql delete operator
+ * @since 0.3.3
+ * @deprecated
+ */
+@Deprecated
+public class DeleteOperator extends BaseOperator
+{
+
+ /**
+ * condition.
+ */
+ private Condition condition = null;
+
+ /**
+ * set condition.
+ */
+ public void setCondition(Condition condition)
+ {
+ this.condition = condition;
+ }
+
+ /**
+ * Input port that takes a map of <string,object>.
+ */
+ public final transient DefaultInputPort<Map<String, Object>> inport = new DefaultInputPort<Map<String, Object>>()
+ {
+
+ @Override
+ public void process(Map<String, Object> tuple)
+ {
+ if ((condition != null) && (!condition.isValidRow(tuple))) {
+ outport.emit(tuple);
+ }
+ }
+ };
+
+ /**
+ * Output port emits a map of <string,object>.
+ */
+ public final transient DefaultOutputPort<Map<String, Object>> outport = new DefaultOutputPort<Map<String, Object>>();
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/DerbySqlStreamOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/DerbySqlStreamOperator.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/DerbySqlStreamOperator.java
new file mode 100644
index 0000000..a55c7d7
--- /dev/null
+++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/DerbySqlStreamOperator.java
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.streamquery;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.apex.malhar.contrib.misc.streamquery.AbstractSqlStreamOperator.InputSchema.ColumnInfo;
+
+import com.datatorrent.api.Context.OperatorContext;
+
+/**
+ * An implementation of AbstractSqlStreamOperator that provides embedded derby sql input operator.
+ * <p>
+ * @displayName Derby Sql Stream
+ * @category Stream Manipulators
+ * @tags sql, in-memory, input operator
+ * @since 0.3.2
+ * @deprecated
+ */
+@Deprecated
+public class DerbySqlStreamOperator extends AbstractSqlStreamOperator
+{
+ protected transient ArrayList<PreparedStatement> insertStatements = new ArrayList<PreparedStatement>(5);
+ protected List<String> execStmtStringList = new ArrayList<String>();
+ protected transient ArrayList<PreparedStatement> execStatements = new ArrayList<PreparedStatement>(5);
+ protected transient ArrayList<PreparedStatement> deleteStatements = new ArrayList<PreparedStatement>(5);
+ protected transient Connection db;
+
+ public void addExecStatementString(String stmt)
+ {
+ this.execStmtStringList.add(stmt);
+ }
+
+
+ @Override
+ public void setup(OperatorContext context)
+ {
+ System.setProperty("derby.stream.error.file", "/dev/null");
+ try {
+ Class.forName("org.apache.derby.jdbc.EmbeddedDriver").newInstance();
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+
+ String connUrl = "jdbc:derby:memory:MALHAR_TEMP;create=true";
+ PreparedStatement st;
+
+ try {
+ db = DriverManager.getConnection(connUrl);
+ // create the temporary tables here
+ for (int i = 0; i < inputSchemas.size(); i++) {
+ InputSchema inputSchema = inputSchemas.get(i);
+ if (inputSchema == null || inputSchema.columnInfoMap.isEmpty()) {
+ continue;
+ }
+ String columnSpec = "";
+ String columnNames = "";
+ String insertQuestionMarks = "";
+ int j = 0;
+ for (Map.Entry<String, ColumnInfo> entry : inputSchema.columnInfoMap.entrySet()) {
+ if (!columnSpec.isEmpty()) {
+ columnSpec += ",";
+ columnNames += ",";
+ insertQuestionMarks += ",";
+ }
+ columnSpec += entry.getKey();
+ columnSpec += " ";
+ columnSpec += entry.getValue().type;
+ columnNames += entry.getKey();
+ insertQuestionMarks += "?";
+ entry.getValue().bindIndex = ++j;
+ }
+ String createTempTableStmt =
+ "DECLARE GLOBAL TEMPORARY TABLE SESSION." + inputSchema.name + "(" + columnSpec + ") NOT LOGGED";
+ st = db.prepareStatement(createTempTableStmt);
+ st.execute();
+ st.close();
+
+ String insertStmt = "INSERT INTO SESSION." + inputSchema.name + " (" + columnNames + ") VALUES ("
+ + insertQuestionMarks + ")";
+
+ insertStatements.add(i, db.prepareStatement(insertStmt));
+ deleteStatements.add(i, db.prepareStatement("DELETE FROM SESSION." + inputSchema.name));
+ }
+ for (String stmtStr : execStmtStringList) {
+ execStatements.add(db.prepareStatement(stmtStr));
+ }
+ } catch (SQLException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ @Override
+ public void beginWindow(long windowId)
+ {
+ try {
+ db.setAutoCommit(false);
+ } catch (SQLException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ @Override
+ public void processTuple(int tableNum, HashMap<String, Object> tuple)
+ {
+ InputSchema inputSchema = inputSchemas.get(tableNum);
+
+ PreparedStatement insertStatement = insertStatements.get(tableNum);
+ try {
+ for (Map.Entry<String, Object> entry : tuple.entrySet()) {
+ ColumnInfo t = inputSchema.columnInfoMap.get(entry.getKey());
+ if (t != null && t.bindIndex != 0) {
+ insertStatement.setString(t.bindIndex, entry.getValue().toString());
+ }
+ }
+
+ insertStatement.executeUpdate();
+ insertStatement.clearParameters();
+ } catch (SQLException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ @Override
+ public void endWindow()
+ {
+ try {
+ db.commit();
+ if (bindings != null) {
+ for (int i = 0; i < bindings.size(); i++) {
+ for (PreparedStatement stmt : execStatements) {
+ stmt.setString(i, bindings.get(i).toString());
+ }
+ }
+ }
+
+ for (PreparedStatement stmt : execStatements) {
+ executePreparedStatement(stmt);
+ }
+ for (PreparedStatement st : deleteStatements) {
+ st.executeUpdate();
+ st.clearParameters();
+ }
+ } catch (SQLException ex) {
+ throw new RuntimeException(ex);
+ }
+ bindings = null;
+ }
+
+ private void executePreparedStatement(PreparedStatement statement) throws SQLException
+ {
+ ResultSet res = statement.executeQuery();
+ ResultSetMetaData resmeta = res.getMetaData();
+ int columnCount = resmeta.getColumnCount();
+ while (res.next()) {
+ HashMap<String, Object> resultRow = new HashMap<String, Object>();
+ for (int i = 1; i <= columnCount; i++) {
+ resultRow.put(resmeta.getColumnName(i), res.getObject(i));
+ }
+ this.result.emit(resultRow);
+ }
+ statement.clearParameters();
+ }
+
+ @Override
+ public void teardown()
+ {
+ try {
+ db.close();
+ } catch (SQLException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/GroupByHavingOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/GroupByHavingOperator.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/GroupByHavingOperator.java
new file mode 100644
index 0000000..9999429
--- /dev/null
+++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/GroupByHavingOperator.java
@@ -0,0 +1,230 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.streamquery;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.validation.constraints.NotNull;
+
+import org.apache.apex.malhar.contrib.misc.streamquery.condition.HavingCondition;
+import org.apache.apex.malhar.contrib.misc.streamquery.function.FunctionIndex;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.streamquery.index.ColumnIndex;
+import com.datatorrent.lib.streamquery.condition.Condition;
+@Deprecated
+public class GroupByHavingOperator extends BaseOperator
+{
+
+ /**
+ * aggregate indexes.
+ */
+ private ArrayList<FunctionIndex> aggregates = new ArrayList<FunctionIndex>();
+
+ /**
+ * Column, Group by names
+ */
+ private ArrayList<ColumnIndex> columnGroupIndexes = new ArrayList<ColumnIndex>();
+
+ /**
+ * where condition.
+ */
+ private Condition condition;
+
+ /**
+ * having aggregate condtion;
+ */
+ private ArrayList<HavingCondition> havingConditions = new ArrayList<HavingCondition>();
+
+ /**
+ * Table rows.
+ */
+ private ArrayList<Map<String, Object>> rows = new ArrayList<Map<String, Object>>();
+
+ public void addAggregateIndex(@NotNull FunctionIndex index)
+ {
+ aggregates.add(index);
+ }
+
+ public void addColumnGroupByIndex(@NotNull ColumnIndex index)
+ {
+ columnGroupIndexes.add(index);
+ }
+
+ public void addHavingCondition(@NotNull HavingCondition condition)
+ {
+ havingConditions.add(condition);
+ }
+
+ /**
+ * @param condition condition
+ */
+ public void setCondition(Condition condition)
+ {
+ this.condition = condition;
+ }
+
+ /**
+ * Input port that takes a map of <string,object>.
+ */
+ public final transient DefaultInputPort<Map<String, Object>> inport = new DefaultInputPort<Map<String, Object>>()
+ {
+
+ @Override
+ public void process(Map<String, Object> tuple)
+ {
+ if ((condition != null) && (!condition.isValidRow(tuple))) {
+ return;
+ }
+ rows.add(tuple);
+ }
+ };
+
+ /**
+ * Output port that emits a map of <string,object>.
+ */
+ public final transient DefaultOutputPort<Map<String, Object>> outport = new DefaultOutputPort<Map<String, Object>>();
+
+ /**
+ * Create aggregate at end window.
+ */
+ @Override
+ public void endWindow()
+ {
+ // group names
+ if (columnGroupIndexes.size() == 0) {
+ rows = new ArrayList<Map<String, Object>>();
+ return;
+ }
+
+ // group rows
+ HashMap<MultiKeyCompare, ArrayList<Map<String, Object>>> groups = new HashMap<MultiKeyCompare, ArrayList<Map<String, Object>>>();
+ for (Map<String, Object> row : rows) {
+ MultiKeyCompare key = new MultiKeyCompare();
+ for (ColumnIndex index : columnGroupIndexes) {
+ key.addCompareKey(row.get(index.getColumn()));
+ }
+ ArrayList<Map<String, Object>> subRows;
+ if (groups.containsKey(key)) {
+ subRows = groups.get(key);
+ } else {
+ subRows = new ArrayList<Map<String, Object>>();
+ groups.put(key, subRows);
+ }
+ subRows.add(row);
+ }
+
+ // Iterate over groups and emit aggregate values
+ for (Map.Entry<MultiKeyCompare, ArrayList<Map<String, Object>>> entry : groups
+ .entrySet()) {
+ ArrayList<Map<String, Object>> subRows = entry.getValue();
+
+ // get result
+ Map<String, Object> result = new HashMap<String, Object>();
+ for (ColumnIndex index : columnGroupIndexes) {
+ index.filter(subRows.get(0), result);
+ }
+
+ // append aggregate values
+ for (FunctionIndex aggregate : aggregates) {
+ try {
+ aggregate.filter(subRows, result);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ // check valid having aggregate
+ boolean isValidHaving = true;
+ for (HavingCondition condition : havingConditions) {
+ try {
+ isValidHaving &= condition.isValidAggregate(subRows);
+ } catch (Exception e) {
+ e.printStackTrace();
+ return;
+ }
+ }
+ if (isValidHaving) {
+ outport.emit(result);
+ }
+ }
+
+ rows = new ArrayList<Map<String, Object>>();
+ }
+
+ /**
+ * multi key compare class.
+ */
+ @SuppressWarnings("rawtypes")
+ private class MultiKeyCompare implements Comparable
+ {
+
+ /**
+ * compare keys.
+ */
+ ArrayList<Object> compareKeys = new ArrayList<Object>();
+
+ @Override
+ public boolean equals(Object other)
+ {
+ if (other instanceof MultiKeyCompare) {
+ if (compareKeys.size() != ((MultiKeyCompare)other).compareKeys.size()) {
+ return false;
+ }
+ }
+ for (int i = 0; i < compareKeys.size(); i++) {
+ if (!(compareKeys.get(i).equals(((MultiKeyCompare)other).compareKeys.get(i)))) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int hashCode = 0;
+ for (int i = 0; i < compareKeys.size(); i++) {
+ hashCode += compareKeys.get(i).hashCode();
+ }
+ return hashCode;
+ }
+
+ @Override
+ public int compareTo(Object other)
+ {
+ if (this.equals(other)) {
+ return 0;
+ }
+ return -1;
+ }
+
+ /**
+ * Add compare key.
+ */
+ public void addCompareKey(Object value)
+ {
+ compareKeys.add(value);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/InnerJoinOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/InnerJoinOperator.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/InnerJoinOperator.java
new file mode 100644
index 0000000..d3e11c3
--- /dev/null
+++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/InnerJoinOperator.java
@@ -0,0 +1,212 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.streamquery;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.annotation.OperatorAnnotation;
+import com.datatorrent.lib.streamquery.condition.Condition;
+import com.datatorrent.lib.streamquery.index.Index;
+
+/**
+ * An implementation of Operator that reads table row data from two table data input ports. <br>
+ * <p>
+ * Operator joins row on given condition and selected names, emits
+ * joined result at output port.
+ * <br>
+ * <b>StateFull : Yes,</b> Operator aggregates input over application window. <br>
+ * <b>Partitions : No, </b> will yield wrong result(s). <br>
+ * <br>
+ * <b>Ports : </b> <br>
+ * <b> inport1 : </b> Input port for table 1, expects HashMap<String, Object> <br>
+ * <b> inport1 : </b> Input port for table 2, expects HashMap<String, Object> <br>
+ * <b> outport : </b> Output joined row port, emits HashMap<String, ArrayList<Object>> <br>
+ * <br>
+ * <b> Properties : </b>
+ * <b> joinCondition : </b> Join condition for table rows. <br>
+ * <b> table1Columns : </b> Columns to be selected from table1. <br>
+ * <b> table2Columns : </b> Columns to be selected from table2. <br>
+ * <br>
+ * @displayName Inner join
+ * @category Stream Manipulators
+ * @tags sql, inner join operator
+ *
+ * @since 0.3.3
+ * @deprecated
+ */
+@Deprecated
+@OperatorAnnotation(partitionable = false)
+public class InnerJoinOperator implements Operator
+{
+
+ /**
+ * Join Condition;
+ */
+ protected Condition joinCondition;
+
+ /**
+ * Table1 select columns.
+ */
+ private ArrayList<Index> table1Columns = new ArrayList<Index>();
+
+ /**
+ * Table2 select columns.
+ */
+ private ArrayList<Index> table2Columns = new ArrayList<Index>();
+
+ /**
+ * Collect data rows from input port 1.
+ */
+ protected ArrayList<Map<String, Object>> table1;
+
+ /**
+ * Collect data from input port 2.
+ */
+ protected ArrayList<Map<String, Object>> table2;
+
+ /**
+ * Input port 1 that takes a map of <string,object>.
+ */
+ public final transient DefaultInputPort<Map<String, Object>> inport1 = new DefaultInputPort<Map<String, Object>>()
+ {
+ @Override
+ public void process(Map<String, Object> tuple)
+ {
+ table1.add(tuple);
+ for (int j = 0; j < table2.size(); j++) {
+ if ((joinCondition == null) || (joinCondition.isValidJoin(tuple, table2.get(j)))) {
+ joinRows(tuple, table2.get(j));
+ }
+ }
+ }
+ };
+
+ /**
+ * Input port 2 that takes a map of <string,object>.
+ */
+ public final transient DefaultInputPort<Map<String, Object>> inport2 = new DefaultInputPort<Map<String, Object>>()
+ {
+ @Override
+ public void process(Map<String, Object> tuple)
+ {
+ table2.add(tuple);
+ for (int j = 0; j < table1.size(); j++) {
+ if ((joinCondition == null) || (joinCondition.isValidJoin(table1.get(j), tuple))) {
+ joinRows(table1.get(j), tuple);
+ }
+ }
+ }
+ };
+
+ /**
+ * Output port that emits a map of <string,object>.
+ */
+ public final transient DefaultOutputPort<Map<String, Object>> outport =
+ new DefaultOutputPort<Map<String, Object>>();
+
+ @Override
+ public void setup(OperatorContext arg0)
+ {
+ table1 = new ArrayList<Map<String, Object>>();
+ table2 = new ArrayList<Map<String, Object>>();
+ }
+
+ @Override
+ public void teardown()
+ {
+ }
+
+ @Override
+ public void beginWindow(long arg0)
+ {
+ }
+
+ @Override
+ public void endWindow()
+ {
+ table1.clear();
+ table2.clear();
+ }
+
+ /**
+ * @return the joinCondition
+ */
+ public Condition getJoinCondition()
+ {
+ return joinCondition;
+ }
+
+ /**
+ * Pick the supported condition. Currently only equal join is supported.
+ * @param joinCondition joinCondition
+ */
+ public void setJoinCondition(Condition joinCondition)
+ {
+ this.joinCondition = joinCondition;
+ }
+
+ /**
+ * Select table1 column name.
+ */
+ public void selectTable1Column(Index column)
+ {
+ table1Columns.add(column);
+ }
+
+ /**
+ * Select table2 column name.
+ */
+ public void selectTable2Column(Index column)
+ {
+ table2Columns.add(column);
+ }
+
+ /**
+ * Join row from table1 and table2.
+ */
+ protected void joinRows(Map<String, Object> row1, Map<String, Object> row2)
+ {
+ // joined row
+ Map<String, Object> join = new HashMap<String, Object>();
+
+ // filter table1 columns
+ if (row1 != null) {
+ for (Index index: table1Columns) {
+ index.filter(row1, join);
+ }
+ }
+
+ // filter table1 columns
+ if (row2 != null) {
+ for (Index index: table2Columns) {
+ index.filter(row2, join);
+ }
+ }
+
+ // emit row
+ outport.emit(join);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/OrderByOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/OrderByOperator.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/OrderByOperator.java
new file mode 100644
index 0000000..c7a5b25
--- /dev/null
+++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/OrderByOperator.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.streamquery;
+
+import java.util.ArrayList;
+import java.util.Map;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.Operator.Unifier;
+
+/**
+ * An implementation of Operator that provides sql order by operator semantic over live stream data. <br>
+ * <p>
+ * Input data rows are ordered by order rules, ordered result is emitted on output port. <br>
+ * <br>
+ * * <br>
+ * <b>StateFull : Yes,</b> Operator aggregates input over application window. <br>
+ * <b>Partitions : Yes, </b> This operator is also unifier on output port. <br>
+ * <br>
+ * <b>Ports</b>:<br>
+ * <b> inport : </b> Input hash map(row) port, expects HashMap<String,Object><<br>
+ * <b> outport : </b> Output hash map(row) port, emits HashMap<String,Object><br>
+ * <br>
+ * <b> Properties : </b> <br>
+ * <b> orderByRules : </b>List of order by rules for tuples.
+ * @displayName OrderBy
+ * @category Stream Manipulators
+ * @tags orderby operator
+ * @since 0.3.5
+ * @deprecated
+ */
+@Deprecated
+public class OrderByOperator implements Operator, Unifier<Map<String, Object>>
+{
+ /**
+ * Order by rules.
+ */
+ ArrayList<OrderByRule<?>> orderByRules = new ArrayList<OrderByRule<?>>();
+
+ /**
+ * Descending flag.
+ */
+ private boolean isDescending;
+
+ /**
+ * collected rows.
+ */
+ private ArrayList<Map<String, Object>> rows;
+
+ /**
+ * Add order by rule.
+ */
+ public void addOrderByRule(OrderByRule<?> rule)
+ {
+ orderByRules.add(rule);
+ }
+
+ /**
+ * @return isDescending
+ */
+ public boolean isDescending()
+ {
+ return isDescending;
+ }
+
+ /**
+ * @param isDescending isDescending
+ */
+ public void setDescending(boolean isDescending)
+ {
+ this.isDescending = isDescending;
+ }
+
+ @Override
+ public void process(Map<String, Object> tuple)
+ {
+ rows.add(tuple);
+ }
+
+ @Override
+ public void beginWindow(long arg0)
+ {
+ rows = new ArrayList<Map<String, Object>>();
+ }
+
+ @Override
+ public void endWindow()
+ {
+ for (int i = 0; i < orderByRules.size(); i++) {
+ rows = orderByRules.get(i).sort(rows);
+ }
+ if (isDescending) {
+ for (int i = 0; i < rows.size(); i++) {
+ outport.emit(rows.get(i));
+ }
+ } else {
+ for (int i = rows.size() - 1; i >= 0; i--) {
+ outport.emit(rows.get(i));
+ }
+ }
+ }
+
+ @Override
+ public void setup(OperatorContext arg0)
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void teardown()
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ /**
+ * Input port that takes a map of <string,object>.
+ */
+ public final transient DefaultInputPort<Map<String, Object>> inport = new DefaultInputPort<Map<String, Object>>()
+ {
+ @Override
+ public void process(Map<String, Object> tuple)
+ {
+ rows.add(tuple);
+ }
+ };
+
+ /**
+ * Output port that emits a map of <string,object>.
+ */
+ public final transient DefaultOutputPort<Map<String, Object>> outport = new DefaultOutputPort<Map<String, Object>>()
+ {
+ @Override
+ public Unifier<Map<String, Object>> getUnifier()
+ {
+ OrderByOperator unifier = new OrderByOperator();
+ for (int i = 0; i < getOrderByRules().size(); i++) {
+ unifier.addOrderByRule(getOrderByRules().get(i));
+ }
+ unifier.setDescending(isDescending);
+ return unifier;
+ }
+ };
+
+ /**
+ * @return the orderByRules
+ */
+ public ArrayList<OrderByRule<?>> getOrderByRules()
+ {
+ return orderByRules;
+ }
+
+ /**
+ * The order by rules used to order incoming tuples.
+ * @param oredrByRules the orderByRules to set
+ */
+ public void setOrderByRules(ArrayList<OrderByRule<?>> oredrByRules)
+ {
+ this.orderByRules = oredrByRules;
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/OrderByRule.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/OrderByRule.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/OrderByRule.java
new file mode 100644
index 0000000..cc90354
--- /dev/null
+++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/OrderByRule.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.streamquery;
+
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.TreeMap;
+
+/**
+ * Implements order by key name rule. <br>
+ * <p>
+ * <b>Properties : </b> <br>
+ * <b> columnName : </b> Name of column for ordering tuples. <br>
+ * @displayName OrderBy Rule
+ * @category Stream Manipulators
+ * @tags orderby, sort, comparison
+ * @since 0.3.3
+ * @deprecated
+ */
+@Deprecated
+@SuppressWarnings("rawtypes")
+public class OrderByRule<T extends Comparable>
+{
+
+ /**
+ * column name for ordering tuples.
+ */
+ private String columnName;
+
+ public OrderByRule(String name)
+ {
+
+ columnName = name;
+ }
+
+ /**
+ * sort rows by each rule and emit result on output port.
+ */
+ @SuppressWarnings("unchecked")
+ public ArrayList<Map<String, Object>> sort(ArrayList<Map<String, Object>> rows)
+ {
+
+ TreeMap<T, ArrayList<Map<String, Object>>> sorted = new TreeMap<T, ArrayList<Map<String, Object>>>();
+ for (int i = 0; i < rows.size(); i++) {
+ Map<String, Object> row = rows.get(i);
+ if (row.containsKey(columnName)) {
+ T value = (T)row.get(columnName);
+ ArrayList<Map<String, Object>> list;
+ if (sorted.containsKey(value)) {
+ list = sorted.get(value);
+ } else {
+ list = new ArrayList<Map<String, Object>>();
+ sorted.put(value, list);
+ }
+ list.add(row);
+ }
+ }
+ ArrayList<Map<String, Object>> result = new ArrayList<Map<String, Object>>();
+ for (Map.Entry<T, ArrayList<Map<String, Object>>> entry : sorted.entrySet()) {
+ result.addAll(entry.getValue());
+ }
+ return result;
+ }
+
+ /**
+ * @return the columnName
+ */
+ public String getColumnName()
+ {
+
+ return columnName;
+ }
+
+ /**
+ * @param columnName
+ * the columnName to set
+ */
+ public void setColumnName(String columnName)
+ {
+
+ this.columnName = columnName;
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/OuterJoinOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/OuterJoinOperator.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/OuterJoinOperator.java
new file mode 100644
index 0000000..9a8fde8
--- /dev/null
+++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/OuterJoinOperator.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.streamquery;
+
+/**
+ * An operator that provides sql left,right and full outer join metric semantics on live stream. <br>
+ * <p>
+ * Please refer to {@link org.apache.apex.malhar.lib.misc.streamquery.InnerJoinOperator} for
+ * details.
+ *
+ * <b> Properties : </b> <br>
+ * <b> isLeftJoin : </b> Left join flag. <br>
+ * <b> isFullJoin : </b> Full join flag. <br>
+ * @displayName Outer Join
+ * @category Stream Manipulators
+ * @tags sql, outer join operator
+ * @since 0.3.4
+ * @deprecated
+ */
+@Deprecated
+public class OuterJoinOperator extends InnerJoinOperator
+{
+
+ private boolean isLeftJoin = true;
+ private boolean isFullJoin = false;
+
+ @Override
+ public void endWindow()
+ {
+ // full outer join
+ if (isFullJoin) {
+ for (int i = 0; i < table1.size(); i++) {
+ boolean merged = false;
+ for (int j = 0; j < table2.size(); j++) {
+ if ((joinCondition == null)
+ || (joinCondition.isValidJoin(table1.get(i), table2.get(j)))) {
+ merged = true;
+ }
+ }
+ if (!merged) {
+ joinRows(table1.get(i), null);
+ }
+ }
+ for (int i = 0; i < table2.size(); i++) {
+ boolean merged = false;
+ for (int j = 0; j < table1.size(); j++) {
+ if ((joinCondition == null)
+ || (joinCondition.isValidJoin(table1.get(j), table2.get(i)))) {
+ merged = true;
+ }
+ }
+ if (!merged) { // only output non merged rows
+ joinRows(null, table2.get(i));
+ }
+ }
+ return;
+ }
+
+ // left or right join
+ if (isLeftJoin) {
+ for (int i = 0; i < table1.size(); i++) {
+ boolean merged = false;
+ for (int j = 0; j < table2.size(); j++) {
+ if ((joinCondition == null)
+ || (joinCondition.isValidJoin(table1.get(i), table2.get(j)))) {
+ merged = true;
+ }
+ }
+ if (!merged) {
+ joinRows(table1.get(i), null);
+ }
+ }
+ } else {
+ for (int i = 0; i < table2.size(); i++) {
+ boolean merged = false;
+ for (int j = 0; j < table1.size(); j++) {
+ if ((joinCondition == null) || (joinCondition.isValidJoin(table1.get(j), table2.get(i)))) {
+ merged = true;
+ }
+ }
+ if (!merged) { // only output non merged rows
+ joinRows(null, table2.get(i));
+ }
+ }
+ }
+ }
+
+ public void setLeftJoin()
+ {
+ isLeftJoin = true;
+ }
+
+ public void setRighttJoin()
+ {
+ isLeftJoin = false;
+ }
+
+ public boolean isFullJoin()
+ {
+ return isFullJoin;
+ }
+
+ public void setFullJoin(boolean isFullJoin)
+ {
+ this.isFullJoin = isFullJoin;
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/SelectFunctionOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/SelectFunctionOperator.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/SelectFunctionOperator.java
new file mode 100644
index 0000000..6e8008b
--- /dev/null
+++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/SelectFunctionOperator.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.streamquery;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.apex.malhar.contrib.misc.streamquery.function.FunctionIndex;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.annotation.OperatorAnnotation;
+
+/**
+ * An implementation of Operator that applies sql top or limit semantics on incoming tuple(s). <br>
+ * <p>
+ * <b>StateFull : Yes,</b> Operator aggregates input over application window. <br>
+ * <b>Partitions : No, </b> will yield wrong result(s). <br>
+ * <br>
+ * <b>Ports : </b> <br>
+ * <b>inport : </b> expect tuple for type T. <br>
+ * <b>outport : </b> emits tuple for type T. <br>
+ * <br>
+ * <b> Properties : </b> <br>
+ * <b> functions : </b> Sql function for rows. <br>
+ * @displayName Select Function
+ * @category Stream Manipulators
+ * @tags sql top, sql limit, sql select operator
+ * @since 0.3.4
+ * @deprecated
+ */
+@Deprecated
+@OperatorAnnotation(partitionable = false)
+public class SelectFunctionOperator implements Operator
+{
+ /**
+ * array of rows.
+ */
+ private ArrayList<Map<String, Object>> rows;
+
+ /**
+ * Aggregate function for rows.
+ */
+ private ArrayList<FunctionIndex> functions = new ArrayList<FunctionIndex>();
+
+ /**
+ * Input port that takes a map of <string,object>.
+ */
+ public final transient DefaultInputPort<Map<String, Object>> inport = new DefaultInputPort<Map<String, Object>>()
+ {
+
+ @Override
+ public void process(Map<String, Object> row)
+ {
+ rows.add(row);
+ }
+ };
+
+ @Override
+ public void setup(OperatorContext context)
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void teardown()
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void beginWindow(long windowId)
+ {
+ rows = new ArrayList<Map<String, Object>>();
+ }
+
+ @Override
+ public void endWindow()
+ {
+ if (functions.size() == 0) {
+ return;
+ }
+ Map<String, Object> collect = new HashMap<String, Object>();
+ for (FunctionIndex function : functions) {
+ try {
+ function.filter(rows, collect);
+ } catch (Exception e) {
+ e.printStackTrace();
+ return;
+ }
+ }
+ outport.emit(collect);
+ }
+
+ /**
+ * Output port that emits a map of <string,object>.
+ */
+ public final transient DefaultOutputPort<Map<String, Object>> outport = new DefaultOutputPort<Map<String, Object>>();
+
+ /**
+ * Add sql function.
+ * @param function Sql function for rows.
+ */
+ public void addSqlFunction(FunctionIndex function)
+ {
+ functions.add(function);
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/SelectOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/SelectOperator.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/SelectOperator.java
new file mode 100644
index 0000000..08799cb
--- /dev/null
+++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/SelectOperator.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.streamquery;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.streamquery.condition.Condition;
+import com.datatorrent.lib.streamquery.index.Index;
+
+/**
+ * An implementation of that provides sql select query semantics on live data stream. <br>
+ * <p>
+ * Stream rows passing condition are emitted on output port stream. <br>
+ * <br>
+ * <b>StateFull : NO,</b> all row data is processed in current time window. <br>
+ * <b>Partitions : Yes, </b> No Input dependency among input rows. <br>
+ * <br>
+ * <b>Ports</b>:<br>
+ * <b> inport : </b> Input hash map(row) port, expects
+ * HashMap<String,Object><<br>
+ * <b> outport : </b> Output hash map(row) port, emits
+ * HashMap<String,Object><br>
+ * <br>
+ * <b> Properties : <b> <br>
+ * <b> condition : </b> Select condition for selecting rows. <br>
+ * <b> columns : </b> Column names/aggregate functions for select. <br>
+ * <br>
+ * @displayName Select
+ * @category Stream Manipulators
+ * @tags sql select operator, index, sql condition
+ * @since 0.3.3
+ * @deprecated
+ */
+@Deprecated
+public class SelectOperator extends BaseOperator
+{
+
+ /**
+ * select columns/expression;
+ */
+ private ArrayList<Index> indexes = new ArrayList<Index>();
+
+ /**
+ * condition.
+ */
+ private Condition condition = null;
+
+ /**
+ * add index.
+ */
+ public void addIndex(Index index)
+ {
+ indexes.add(index);
+ }
+
+ /**
+ * set condition.
+ */
+ public void setCondition(Condition condition)
+ {
+ this.condition = condition;
+ }
+
+ /**
+ * Input port that takes a map of <string,object>.
+ */
+ public final transient DefaultInputPort<Map<String, Object>> inport = new DefaultInputPort<Map<String, Object>>()
+ {
+
+ @Override
+ public void process(Map<String, Object> tuple)
+ {
+ if ((condition != null) && (!condition.isValidRow(tuple))) {
+ return;
+ }
+ if (indexes.size() == 0) {
+ outport.emit(tuple);
+ return;
+ }
+ Map<String, Object> result = new HashMap<String, Object>();
+ for (int i = 0; i < indexes.size(); i++) {
+ indexes.get(i).filter(tuple, result);
+ }
+ outport.emit(result);
+ }
+ };
+
+ /**
+ * Output port that emits a map of <string,object>.
+ */
+ public final transient DefaultOutputPort<Map<String, Object>> outport = new DefaultOutputPort<Map<String, Object>>();
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/SelectTopOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/SelectTopOperator.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/SelectTopOperator.java
new file mode 100644
index 0000000..2880198
--- /dev/null
+++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/SelectTopOperator.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.streamquery;
+
+import java.util.ArrayList;
+import java.util.Map;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+
+/**
+ * An implementation of Operator that provides sql top select query semantic on live data stream. <br>
+ * <p>
+ * Stream rows passing condition are emitted on output port stream. <br>
+ * <br>
+ * <b>StateFull : NO,</b> all row data is processed in current time window. <br>
+ * <b>Partitions : Yes, </b> No Input dependency among input rows. <br>
+ * <br>
+ * <b>Ports</b>:<br>
+ * <b> inport : </b> Input hash map(row) port, expects
+ * HashMap<String,Object><<br>
+ * <b> outport : </b> Output hash map(row) port, emits
+ * HashMap<String,Object><br>
+ * <br>
+ * <b> Properties : <b> <br>
+ * <b> topValue : </b> top values count. <br>
+ * <b> isPercentage : </b> top values count is percentage flag.
+ * <br>
+ * @displayName Select Top
+ * @category Stream Manipulators
+ * @tags sql select, sql top operator
+ * @since 0.3.4
+ * @deprecated
+ */
+@Deprecated
+public class SelectTopOperator implements Operator
+{
+ private ArrayList<Map<String, Object>> list;
+ private int topValue = 1;
+ private boolean isPercentage = false;
+
+ /**
+ * Input port that takes a map of <string,object>.
+ */
+ public final transient DefaultInputPort<Map<String, Object>> inport = new DefaultInputPort<Map<String, Object>>()
+ {
+ @Override
+ public void process(Map<String, Object> tuple)
+ {
+ list.add(tuple);
+ }
+ };
+
+ @Override
+ public void setup(OperatorContext context)
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void teardown()
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void beginWindow(long windowId)
+ {
+ list = new ArrayList<Map<String, Object>>();
+ }
+
+ @Override
+ public void endWindow()
+ {
+ int numEmits = topValue;
+ if (isPercentage) {
+ numEmits = list.size() * (topValue / 100);
+ }
+ for (int i = 0; (i < numEmits) && (i < list.size()); i++) {
+ outport.emit(list.get(i));
+ }
+ }
+
+ public int getTopValue()
+ {
+ return topValue;
+ }
+
+ public void setTopValue(int topValue) throws Exception
+ {
+ if (topValue <= 0) {
+ throw new Exception("Top value must be positive number.");
+ }
+ this.topValue = topValue;
+ }
+
+ public boolean isPercentage()
+ {
+ return isPercentage;
+ }
+
+ public void setPercentage(boolean isPercentage)
+ {
+ this.isPercentage = isPercentage;
+ }
+
+ /**
+ * Output port that emits a map of <string,object>.
+ */
+ public final transient DefaultOutputPort<Map<String, Object>> outport = new DefaultOutputPort<Map<String, Object>>();
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/UpdateOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/UpdateOperator.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/UpdateOperator.java
new file mode 100644
index 0000000..52ddac6
--- /dev/null
+++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/UpdateOperator.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.streamquery;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.streamquery.condition.Condition;
+
+/**
+ * An implementation of BaseOperator that provides sql update query semantic on live data stream. <br>
+ * <p>
+ * Stream rows passing condition are emitted on output port stream. <br>
+ * <br>
+ * <b>StateFull : NO,</b> all row data is processed in current time window. <br>
+ * <b>Partitions : Yes, </b> No Input dependency among input rows. <br>
+ * <br>
+ * <b>Ports</b>:<br>
+ * <b> inport : </b> Input hash map(row) port, expects HashMap<String,Object><<br>
+ * <b> outport : </b> Output hash map(row) port, emits HashMap<String,Object><br>
+ * <br>
+ * <b> Properties : <b> <br>
+ * <b> condition : </b> Select condition for selecting rows. <br>
+ * <b> columns : </b> Column names/aggregate functions for select. <br>
+ * <br>
+ * @displayName Update
+ * @category Stream Manipulators
+ * @tags sql update operator, sql condition
+ * @since 0.3.3
+ * @deprecated
+ */
+@Deprecated
+public class UpdateOperator extends BaseOperator
+{
+ /**
+ * Update value map.
+ */
+ Map<String, Object> updates = new HashMap<String, Object>();
+
+ /**
+ * condition.
+ */
+ private Condition condition = null;
+
+ /**
+ * set condition.
+ */
+ public void setCondition(Condition condition)
+ {
+ this.condition = condition;
+ }
+
+ /**
+ * Input port that takes a map of <string,object>.
+ */
+ public final transient DefaultInputPort<Map<String, Object>> inport = new DefaultInputPort<Map<String, Object>>()
+ {
+ @Override
+ public void process(Map<String, Object> tuple)
+ {
+ if ((condition != null) && (!condition.isValidRow(tuple))) {
+ return;
+ }
+ if (updates.size() == 0) {
+ outport.emit(tuple);
+ return;
+ }
+ Map<String, Object> result = new HashMap<String, Object>();
+ for (Map.Entry<String, Object> entry : tuple.entrySet()) {
+ if (updates.containsKey(entry.getKey())) {
+ result.put(entry.getKey(), updates.get(entry.getKey()));
+ } else {
+ result.put(entry.getKey(), entry.getValue());
+ }
+ }
+ outport.emit(result);
+ }
+ };
+
+ /**
+ * Output port that emits a map of <string,object>.
+ */
+ public final transient DefaultOutputPort<Map<String, Object>> outport = new DefaultOutputPort<Map<String, Object>>();
+
+ /**
+ * Add update value.
+ */
+ public void addUpdate(String name, Object value)
+ {
+ updates.put(name, value);
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/condition/BetweenCondition.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/condition/BetweenCondition.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/condition/BetweenCondition.java
new file mode 100644
index 0000000..155470c
--- /dev/null
+++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/condition/BetweenCondition.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.streamquery.condition;
+
+import java.util.Map;
+
+import javax.validation.constraints.NotNull;
+
+import com.datatorrent.lib.streamquery.condition.Condition;
+
+/**
+ * A derivation of Condition that validates row by checking if the given column name value lies between given left,right range. <br>
+ * <p>
+ * <b>Properties : </b> <br>
+ * <b> column : </b> Name of column. <br>
+ * <b> leftValue : </b> left range of column value. <br>
+ * <b> rightValue : </b> right range od column value. <br>
+ * <br>
+ * @displayName Between Condition
+ * @category Stream Manipulators
+ * @tags sql condition
+ * @since 0.3.4
+ * @deprecated
+ */
+@Deprecated
+public class BetweenCondition extends Condition
+{
+ /**
+ * Column name to be checked.
+ */
+ @NotNull
+ private String column;
+
+ /**
+ * Left range value.
+ */
+ @NotNull
+ private Object leftValue;
+
+ /**
+ * Right range value.
+ */
+ @NotNull
+ private Object rightValue;
+
+ /**
+ * @param column Name of column, must be non null. <br>
+ * @param leftValue Left range for value, mut be non null. <br>
+ * @param rightValue right range for value, mut be non null. <br>
+ */
+ public BetweenCondition(@NotNull String column, @NotNull Object leftValue, @NotNull Object rightValue)
+ {
+ this.column = column;
+ this.leftValue = leftValue;
+ this.rightValue = rightValue;
+ }
+
+ /**
+ * Validate given row.
+ */
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ @Override
+ public boolean isValidRow(@NotNull Map<String, Object> row)
+ {
+ if (!row.containsKey(column)) {
+ return false;
+ }
+ Object value = row.get(column);
+ if (value == null) {
+ return false;
+ }
+ if (((Comparable)value).compareTo((Comparable)leftValue) < 0) {
+ return false;
+ }
+ if (((Comparable)value).compareTo((Comparable)rightValue) > 0) {
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * Must not be called.
+ */
+ @Override
+ public boolean isValidJoin(@NotNull Map<String, Object> row1, Map<String, Object> row2)
+ {
+ assert (false);
+ return false;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/condition/CompoundCondition.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/condition/CompoundCondition.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/condition/CompoundCondition.java
new file mode 100644
index 0000000..d606991
--- /dev/null
+++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/condition/CompoundCondition.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.streamquery.condition;
+
+import java.util.Map;
+
+import javax.validation.constraints.NotNull;
+
+import com.datatorrent.lib.streamquery.condition.Condition;
+
+/**
+ * A derivation of Condition index that implements logical AND/OR select expression. <br>
+ * <p>
+ * Class provides logical OR or AND function specified in parameters. User can implement
+ * complex and/or expression by chaining operator itself.
+ * <br>
+ * <b> Properties : </b> <br>
+ * <b> leftCondition : </b> Left validate row condition . <br>
+ * <b> rightCondition : </b> Right validate row condition. <br>
+ * <b> logicalOr : </b> OR/AND logical metric flag. <br>
+ * <br>
+ * @displayName Compound Condition
+ * @category Stream Manipulators
+ * @tags sql condition, logical
+ * @since 0.3.4
+ * @deprecated
+ */
+@Deprecated
+public class CompoundCondition extends Condition
+{
+ /**
+ * Left validate row condition .
+ */
+ @NotNull
+ private Condition leftCondition;
+
+ /**
+ * Right validate row condition .
+ */
+ @NotNull
+ private Condition rightCondition;
+
+ /**
+ * AND/OR metric flag.
+ */
+ private boolean logicalOr = true;
+
+ /**
+ * Constructor for logical or metric.
+ *
+ * @param leftCondition Left validate row condition, must be non null. <br>
+ * @param rightCondition Right validate row condition, must be non null. <br>
+ */
+ public CompoundCondition(Condition leftCondition, Condition rightCondition)
+ {
+ this.leftCondition = leftCondition;
+ this.rightCondition = rightCondition;
+ }
+
+ /**
+ * Constructor for logical and metric if logical and parameter is true.
+ * <br>
+ *
+ * @param leftCondition Left validate row condition, must be non null. <br>
+ * @param rightCondition Right validate row condition, must be non null. <br>
+ * @param isLogicalAnd Logical AND if true.
+ */
+ public CompoundCondition(Condition leftCondition, Condition rightCondition, boolean isLogicalAnd)
+ {
+ this.leftCondition = leftCondition;
+ this.rightCondition = rightCondition;
+ logicalOr = !isLogicalAnd;
+ }
+
+ @Override
+ public boolean isValidRow(Map<String, Object> row)
+ {
+ if (logicalOr) {
+ return leftCondition.isValidRow(row) || rightCondition.isValidRow(row);
+ } else {
+ return leftCondition.isValidRow(row) && rightCondition.isValidRow(row);
+ }
+ }
+
+ @Override
+ public boolean isValidJoin(Map<String, Object> row1, Map<String, Object> row2)
+ {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ public Condition getLeftCondition()
+ {
+ return leftCondition;
+ }
+
+ public void setLeftCondition(Condition leftCondition)
+ {
+ this.leftCondition = leftCondition;
+ }
+
+ public Condition getRightCondition()
+ {
+ return rightCondition;
+ }
+
+ public void setRightCondition(Condition rightCondition)
+ {
+ this.rightCondition = rightCondition;
+ }
+
+ public void setLogicalAnd()
+ {
+ this.logicalOr = false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/condition/EqualValueCondition.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/condition/EqualValueCondition.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/condition/EqualValueCondition.java
new file mode 100644
index 0000000..a54960d
--- /dev/null
+++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/condition/EqualValueCondition.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.streamquery.condition;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import com.datatorrent.lib.streamquery.condition.Condition;
+
+/**
+ * An implementation of condition on column equality.
+ * <p>
+ * A valid row must have all key/value map in column name/value map.
+ *
+ * <b> Properties : </b> <br>
+ * <b> equalMap : </b> Column equal value map store.
+ * @displayName Equal Value Condition
+ * @category Stream Manipulators
+ * @tags sql condition
+ * @since 0.3.4
+ * @deprecated
+ */
+@Deprecated
+public class EqualValueCondition extends Condition
+{
+
+ /**
+ * Equal column map.
+ */
+ private HashMap<String, Object> equalMap = new HashMap<String, Object>();
+
+ /**
+ * Add column equal condition.
+ */
+ public void addEqualValue(String column, Object value)
+ {
+ equalMap.put(column, value);
+ }
+
+ /**
+ * Check valid row.
+ */
+ @Override
+ public boolean isValidRow(Map<String, Object> row)
+ {
+ // no conditions
+ if (equalMap.size() == 0) {
+ return true;
+ }
+
+ // compare each condition value
+ for (Map.Entry<String, Object> entry : equalMap.entrySet()) {
+ if (!row.containsKey(entry.getKey())) {
+ return false;
+ }
+ Object value = row.get(entry.getKey());
+ if (entry.getValue() == null) {
+ if (value == null) {
+ return true;
+ }
+ return false;
+ }
+ if (value == null) {
+ return false;
+ }
+ if (!entry.getValue().equals(value)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * check valid join, not implemented
+ *
+ * @return false
+ */
+ @Override
+ public boolean isValidJoin(Map<String, Object> row1, Map<String, Object> row2)
+ {
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/condition/HavingCompareValue.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/condition/HavingCompareValue.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/condition/HavingCompareValue.java
new file mode 100644
index 0000000..6e0400b
--- /dev/null
+++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/condition/HavingCompareValue.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.streamquery.condition;
+
+import java.util.ArrayList;
+import java.util.Map;
+
+import javax.validation.constraints.NotNull;
+
+import org.apache.apex.malhar.contrib.misc.streamquery.function.FunctionIndex;
+
+/**
+ * A derivation of HavingCondition that implements comparison of aggregate index value to input compare value. <br>
+ * <p>
+ * Compare value must implement interface Comparable. <br>
+ * <br>
+ * <b> Properties : </b>
+ * <b> compareValue : </b> Value to be compared. <br>
+ * <b> compareType : </b> Type of comparison -1 == lt, 0 == eq, 1 == gt. <br>
+ * @displayName Having Compare Value
+ * @category Stream Manipulators
+ * @tags compare, sql condition
+ * @since 0.3.4
+ * @deprecated
+ */
+@Deprecated
+@SuppressWarnings("rawtypes")
+public class HavingCompareValue<T extends Comparable> extends HavingCondition
+{
+ /**
+ * Value to be compared.
+ */
+ private T compareValue;
+
+ /**
+ * Type of comparison -1 == lt, 0 == eq, 1 == gt.
+ */
+ private int compareType;
+
+ /**
+ * @param aggregateIndex aggregate index for comparison. <br>
+ * @param compareValue Value to be compared. <br>
+ * @param compareType Type of comparison -1 == lt, 0 == eq, 1 == gt. <br>
+ */
+ public HavingCompareValue(FunctionIndex aggregateIndex, T compareValue, int compareType)
+ {
+ super(aggregateIndex);
+ this.compareValue = compareValue;
+ this.compareType = compareType;
+ }
+
+ /**
+ * Validate aggregate override. <br>
+ */
+ @SuppressWarnings("unchecked")
+ @Override
+ public boolean isValidAggregate(@NotNull ArrayList<Map<String, Object>> rows) throws Exception
+ {
+ Object computed = aggregateIndex.compute(rows);
+ return (compareType == compareValue.compareTo(computed));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/condition/HavingCondition.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/condition/HavingCondition.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/condition/HavingCondition.java
new file mode 100644
index 0000000..66ef35c
--- /dev/null
+++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/condition/HavingCondition.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.streamquery.condition;
+
+import java.util.ArrayList;
+import java.util.Map;
+
+import javax.validation.constraints.NotNull;
+
+import org.apache.apex.malhar.contrib.misc.streamquery.function.FunctionIndex;
+
+/**
+ * A base class for Group,Having operator with aggregate index constraint.&nsbsp; Subclasses should provide the
+ implementation to check if aggregate is valid.
+ * <p>
+ * @displayName Having Condition
+ * @category Stream Manipulators
+ * @tags sql condition, index, group
+ * @since 0.3.4
+ * @deprecated
+ */
+@Deprecated
+public abstract class HavingCondition
+{
+ /**
+ * Aggregate index to be validated.
+ */
+ protected FunctionIndex aggregateIndex = null;
+
+ /**
+ * @param aggregateIndex Aggregate index to be validated.
+ */
+ public HavingCondition(FunctionIndex aggregateIndex)
+ {
+ this.aggregateIndex = aggregateIndex;
+ }
+
+ /**
+ * Check if aggregate is valid.
+ */
+ public abstract boolean isValidAggregate(@NotNull ArrayList<Map<String, Object>> rows) throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/condition/InCondition.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/condition/InCondition.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/condition/InCondition.java
new file mode 100644
index 0000000..d19bb99
--- /dev/null
+++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/condition/InCondition.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.streamquery.condition;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import javax.validation.constraints.NotNull;
+
+import com.datatorrent.lib.streamquery.condition.Condition;
+
+/**
+ * An implementation of condition class to check if a column value is in a given set of values.
+ * <p>
+ * <br>
+ * <b>Properties : </b> <br>
+ * <b> column : </b> Column name for which value is checked in values set. <br>
+ * <b> inValues : </b> Set of values in which column value is checked. <br>
+ * @displayName In Condition
+ * @category Stream Manipulators
+ * @tags sql condition
+ * @since 0.3.4
+ * @deprecated
+ */
+@Deprecated
+public class InCondition extends Condition
+{
+ /**
+ * Column name for which value is checked in values set.
+ */
+ @NotNull
+ private String column;
+
+ /**
+ * Set of values in which column value is checked.
+ */
+ private Set<Object> inValues = new HashSet<Object>();
+
+ /**
+ * @param column Column name for which value is checked in values set.
+ */
+ public InCondition(@NotNull String column)
+ {
+ this.column = column;
+ }
+
+ @Override
+ public boolean isValidRow(@NotNull Map<String, Object> row)
+ {
+ if (!row.containsKey(column)) {
+ return false;
+ }
+ return inValues.contains(row.get(column));
+ }
+
+ @Override
+ public boolean isValidJoin(@NotNull Map<String, Object> row1, @NotNull Map<String, Object> row2)
+ {
+ return false;
+ }
+
+ public String getColumn()
+ {
+ return column;
+ }
+
+ public void setColumn(String column)
+ {
+ this.column = column;
+ }
+
+ public void addInValue(Object value)
+ {
+ this.inValues.add(value);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/condition/LikeCondition.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/condition/LikeCondition.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/condition/LikeCondition.java
new file mode 100644
index 0000000..a8789fa
--- /dev/null
+++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/condition/LikeCondition.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.streamquery.condition;
+
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import javax.validation.constraints.NotNull;
+
+import com.datatorrent.lib.streamquery.condition.Condition;
+
+/**
+ * An implementation of condition class to filter rows for which given column name value matches given regular expression. <br>
+ *<p>
+ *<b> Properties : </b> <br>
+ *<b> column : < /b> Column to be matched with regular expression. <br>
+ *<b> pattern : </b> Regular expression pattern.<br>
+ * @displayName Like Condition
+ * @category Stream Manipulators
+ * @tags sql, like condition, regular expression
+ * @since 0.3.4
+ * @deprecated
+ */
+@Deprecated
+public class LikeCondition extends Condition
+{
+ /**
+ * Column to be matched with regular expression.
+ */
+ @NotNull
+ private String column;
+
+ /**
+ * Regular expression pattern.
+ */
+ @NotNull
+ private Pattern pattern;
+
+ /**
+ * @param column Column to be matched with regular expression, must be non-null.
+ * @param pattern Regular expression pattern, must be non-null.
+ */
+ public LikeCondition(@NotNull String column,@NotNull String pattern)
+ {
+ setColumn(column);
+ setPattern(pattern);
+ }
+
+ /**
+ * For valid row column value string must match regular expression.
+ * @return row valid status.
+ */
+ @Override
+ public boolean isValidRow(Map<String, Object> row)
+ {
+ if (!row.containsKey(column)) {
+ return false;
+ }
+ Matcher match = pattern.matcher((CharSequence)row.get(column));
+ return match.find();
+ }
+
+ /**
+ * Must not be called.
+ */
+ @Override
+ public boolean isValidJoin(Map<String, Object> row1, Map<String, Object> row2)
+ {
+ assert (false);
+ return false;
+ }
+
+ public String getColumn()
+ {
+ return column;
+ }
+
+ public void setColumn(String column)
+ {
+ this.column = column;
+ }
+
+ public void setPattern(String pattern)
+ {
+ this.pattern = Pattern.compile(pattern);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/function/AverageFunction.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/function/AverageFunction.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/function/AverageFunction.java
new file mode 100644
index 0000000..72ac59f
--- /dev/null
+++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/function/AverageFunction.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.streamquery.function;
+
+import java.util.ArrayList;
+import java.util.Map;
+
+import javax.validation.constraints.NotNull;
+
+import org.apache.commons.lang.StringUtils;
+
+/**
+ * An implementation of function index that implements average function semantics. <br>
+ * <p>
+ * e.g : sql => SELECT AVG(column_name) FROM table_name. <br>
+ * <br>
+ * <b> Properties : </b> <br>
+ * <b> column : </b> Aggregate over given column values. <br>
+ * <b> alias : </b> Alias name for aggregate output. <br>
+ * @displayName Average Function
+ * @category Stream Manipulators
+ * @tags sql average
+ * @since 0.3.4
+ * @deprecated
+ */
+@Deprecated
+public class AverageFunction extends FunctionIndex
+{
+ /**
+ * @param column Aggregate over given column values, must be non null.
+ * @param alias Alias name for aggregate output.
+ */
+ public AverageFunction(@NotNull String column, String alias)
+ {
+ super(column, alias);
+ }
+
+ /**
+ * Compute average for given column values.
+ */
+ @Override
+ public Object compute(@NotNull ArrayList<Map<String, Object>> rows) throws Exception
+ {
+ if (rows.size() == 0) {
+ return 0.0;
+ }
+ double sum = 0.0;
+ for (Map<String, Object> row : rows) {
+ sum += ((Number)row.get(column)).doubleValue();
+ }
+ return sum / rows.size();
+ }
+
+ /**
+ * Get aggregate name.
+ * @return name.
+ */
+ @Override
+ protected String aggregateName()
+ {
+ if (!StringUtils.isEmpty(alias)) {
+ return alias;
+ }
+ return "AVG(" + column + ")";
+ }
+}