You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/10/21 16:58:28 UTC

[3/4] apex-malhar git commit: APEXMALHAR-1818 SQL Support for converting given SQL statement to APEX DAG.

APEXMALHAR-1818 SQL Support for converting given SQL statement to APEX DAG.

Features implemented are:
1. SELECT STATEMENT
2. INSERT STATEMENT
3. INNER JOIN with non-empty equi join condition
4. WHERE clause
5. SCALAR functions implemented in calcite are ready to use
6. Custom scalar functions can be registered.
7. Endpoint can be File OR Kafka OR Streaming Port for both input and output
8. CSV Data Format implemented for both input and output side.
9. Testing on local as well as cluster mode.


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/c92ca15e
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/c92ca15e
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/c92ca15e

Branch: refs/heads/master
Commit: c92ca15e81395d01ece037413d4426d5824b6670
Parents: b968e4c
Author: Chinmay Kolhatkar <ch...@datatorrent.com>
Authored: Thu Oct 13 17:35:16 2016 +0530
Committer: Chinmay Kolhatkar <ch...@datatorrent.com>
Committed: Fri Oct 21 21:24:30 2016 +0530

----------------------------------------------------------------------
 kafka/pom.xml                                   |   7 +
 .../apache/apex/malhar/kafka/EmbeddedKafka.java | 165 +++++++++
 .../lib/transform/TransformOperator.java        |   6 +-
 .../apex/malhar/lib/utils/ClassLoaderUtils.java |  68 ++++
 pom.xml                                         |   1 +
 sql/pom.xml                                     | 213 +++++++++++
 .../apex/malhar/sql/SQLExecEnvironment.java     | 262 ++++++++++++++
 .../malhar/sql/codegen/BeanClassGenerator.java  | 318 +++++++---------
 .../malhar/sql/codegen/ExpressionCompiler.java  | 113 ++++++
 .../sql/operators/FilterTransformOperator.java  |  71 ++++
 .../malhar/sql/operators/InnerJoinOperator.java |  54 +++
 .../apex/malhar/sql/operators/LineReader.java   |  73 ++++
 .../malhar/sql/operators/OperatorUtils.java     |  79 ++++
 .../apex/malhar/sql/planner/ApexRelNode.java    | 341 +++++++++++++++++
 .../apache/apex/malhar/sql/planner/RelInfo.java | 121 +++++++
 .../apex/malhar/sql/planner/RelNodeVisitor.java | 112 ++++++
 .../apex/malhar/sql/schema/ApexSQLTable.java    | 127 +++++++
 .../malhar/sql/schema/ApexSQLTableFactory.java  |  66 ++++
 .../malhar/sql/schema/TupleSchemaRegistry.java  | 227 ++++++++++++
 .../apex/malhar/sql/table/CSVMessageFormat.java | 138 +++++++
 .../apache/apex/malhar/sql/table/Endpoint.java  | 100 +++++
 .../apex/malhar/sql/table/FileEndpoint.java     | 119 ++++++
 .../apex/malhar/sql/table/KafkaEndpoint.java    | 136 +++++++
 .../apex/malhar/sql/table/MessageFormat.java    |  88 +++++
 .../apex/malhar/sql/table/StreamEndpoint.java   | 147 ++++++++
 .../apex/malhar/sql/FileEndpointTest.java       | 249 +++++++++++++
 .../org/apache/apex/malhar/sql/InputPOJO.java   |  69 ++++
 .../apex/malhar/sql/KafkaEndpointTest.java      | 362 +++++++++++++++++++
 .../org/apache/apex/malhar/sql/OutputPOJO.java  |  59 +++
 .../org/apache/apex/malhar/sql/SerDeTest.java   | 223 ++++++++++++
 .../apex/malhar/sql/StreamEndpointTest.java     | 179 +++++++++
 .../sql/codegen/BeanClassGeneratorTest.java     |  23 +-
 sql/src/test/resources/input.csv                |   6 +
 sql/src/test/resources/log4j.properties         |  50 +++
 .../test/resources/model/model_file_csv.json    |  27 ++
 35 files changed, 4195 insertions(+), 204 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c92ca15e/kafka/pom.xml
----------------------------------------------------------------------
diff --git a/kafka/pom.xml b/kafka/pom.xml
index 5c9aee9..e9683e1 100755
--- a/kafka/pom.xml
+++ b/kafka/pom.xml
@@ -242,5 +242,12 @@
       <version>${apex.core.version}</version>
       <type>jar</type>
     </dependency>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka_2.11</artifactId>
+      <version>0.9.0.0</version>
+      <classifier>test</classifier>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c92ca15e/kafka/src/test/java/org/apache/apex/malhar/kafka/EmbeddedKafka.java
----------------------------------------------------------------------
diff --git a/kafka/src/test/java/org/apache/apex/malhar/kafka/EmbeddedKafka.java b/kafka/src/test/java/org/apache/apex/malhar/kafka/EmbeddedKafka.java
new file mode 100644
index 0000000..5ddcb18
--- /dev/null
+++ b/kafka/src/test/java/org/apache/apex/malhar/kafka/EmbeddedKafka.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kafka;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import org.I0Itec.zkclient.ZkClient;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import com.google.common.base.Throwables;
+
+import kafka.admin.AdminUtils;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import kafka.utils.MockTime;
+import kafka.utils.TestUtils;
+import kafka.utils.Time;
+import kafka.utils.ZKStringSerializer$;
+import kafka.utils.ZkUtils;
+import kafka.zk.EmbeddedZookeeper;
+
+public class EmbeddedKafka
+{
+  private static final String KAFKA_PATH = "/tmp/kafka-test";
+
+  private ZkClient zkClient;
+  private ZkUtils zkUtils;
+  private String BROKERHOST = "127.0.0.1";
+  private String BROKERPORT = "9092";
+  private EmbeddedZookeeper zkServer;
+  private KafkaServer kafkaServer;
+
+  public String getBroker()
+  {
+    return BROKERHOST + ":" + BROKERPORT;
+  }
+
+  public void start() throws IOException
+  {
+    // Find port
+    try {
+      ServerSocket serverSocket = new ServerSocket(0);
+      BROKERPORT = Integer.toString(serverSocket.getLocalPort());
+      serverSocket.close();
+    } catch (IOException e) {
+      throw Throwables.propagate(e);
+    }
+
+    // Setup Zookeeper
+    zkServer = new EmbeddedZookeeper();
+    String zkConnect = BROKERHOST + ":" + zkServer.port();
+    zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$);
+    zkUtils = ZkUtils.apply(zkClient, false);
+
+    // Setup brokers
+    cleanupDir();
+    Properties props = new Properties();
+    props.setProperty("zookeeper.connect", zkConnect);
+    props.setProperty("broker.id", "0");
+    props.setProperty("log.dirs", KAFKA_PATH);
+    props.setProperty("listeners", "PLAINTEXT://" + BROKERHOST + ":" + BROKERPORT);
+    KafkaConfig config = new KafkaConfig(props);
+    Time mock = new MockTime();
+    kafkaServer = TestUtils.createServer(config, mock);
+  }
+
+  public void stop() throws IOException
+  {
+    kafkaServer.shutdown();
+    zkClient.close();
+    zkServer.shutdown();
+    cleanupDir();
+  }
+
+  private void cleanupDir() throws IOException
+  {
+    FileUtils.deleteDirectory(new File(KAFKA_PATH));
+  }
+
+  public void createTopic(String topic)
+  {
+    AdminUtils.createTopic(zkUtils, topic, 1, 1, new Properties());
+    List<KafkaServer> servers = new ArrayList<KafkaServer>();
+    servers.add(kafkaServer);
+    TestUtils.waitUntilMetadataIsPropagated(scala.collection.JavaConversions.asScalaBuffer(servers), topic, 0, 30000);
+  }
+
+  public void publish(String topic, List<String> messages)
+  {
+    Properties producerProps = new Properties();
+    producerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + BROKERPORT);
+    producerProps.setProperty("key.serializer","org.apache.kafka.common.serialization.IntegerSerializer");
+    producerProps.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
+
+    try (KafkaProducer<Integer, byte[]> producer = new KafkaProducer<>(producerProps)) {
+      for (String message : messages) {
+        ProducerRecord<Integer, byte[]> data = new ProducerRecord<>(topic, message.getBytes(StandardCharsets.UTF_8));
+        producer.send(data);
+      }
+    }
+
+    List<KafkaServer> servers = new ArrayList<KafkaServer>();
+    servers.add(kafkaServer);
+    TestUtils.waitUntilMetadataIsPropagated(scala.collection.JavaConversions.asScalaBuffer(servers), topic, 0, 30000);
+  }
+
+  public List<String> consume(String topic, int timeout)
+  {
+    return consume(topic, timeout, true);
+  }
+
+  public List<String> consume(String topic, int timeout, boolean earliest)
+  {
+    Properties consumerProps = new Properties();
+    consumerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + BROKERPORT);
+    consumerProps.setProperty("group.id", "group0");
+    consumerProps.setProperty("client.id", "consumer0");
+    consumerProps.setProperty("key.deserializer","org.apache.kafka.common.serialization.IntegerDeserializer");
+    consumerProps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+    consumerProps.put("auto.offset.reset", earliest ? "earliest" : "latest");  // to make sure the consumer starts from the beginning of the topic
+    KafkaConsumer<Integer, byte[]> consumer = new KafkaConsumer<>(consumerProps);
+    consumer.subscribe(Arrays.asList(topic));
+
+    List<String> messages = new ArrayList<>();
+
+    ConsumerRecords<Integer, byte[]> records = consumer.poll(timeout);
+    for (ConsumerRecord<Integer, byte[]> record : records) {
+      messages.add(new String(record.value()));
+    }
+
+    consumer.close();
+
+    return messages;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c92ca15e/library/src/main/java/com/datatorrent/lib/transform/TransformOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/transform/TransformOperator.java b/library/src/main/java/com/datatorrent/lib/transform/TransformOperator.java
index 483748e..6ed8819 100644
--- a/library/src/main/java/com/datatorrent/lib/transform/TransformOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/transform/TransformOperator.java
@@ -56,12 +56,12 @@ public class TransformOperator extends BaseOperator implements Operator.Activati
 {
   @NotNull
   private Map<String, String> expressionMap = new HashMap<>();
-  private List<String> expressionFunctions = new LinkedList<>();
+  protected List<String> expressionFunctions = new LinkedList<>();
   private boolean copyMatchingFields = true;
 
   private transient Map<PojoUtils.Setter, Expression> transformationMap = new HashMap<>();
-  private Class<?> inputClass;
-  private Class<?> outputClass;
+  protected Class<?> inputClass;
+  protected Class<?> outputClass;
 
   public TransformOperator()
   {

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c92ca15e/library/src/main/java/org/apache/apex/malhar/lib/utils/ClassLoaderUtils.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/ClassLoaderUtils.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/ClassLoaderUtils.java
new file mode 100644
index 0000000..855c21f
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/ClassLoaderUtils.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils;
+
+import java.io.IOException;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.FSDataInputStream;
+
+@InterfaceStability.Evolving
+public class ClassLoaderUtils
+{
+  /**
+   * Given the class name it reads and loads the class from the input stream.
+   *
+   * @param fqcn        fully qualified class name.
+   * @param inputStream stream from which class is read.
+   * @return loaded class
+   * @throws IOException
+   */
+  public static Class<?> readBeanClass(String fqcn, FSDataInputStream inputStream) throws IOException
+  {
+    byte[] bytes = IOUtils.toByteArray(inputStream);
+    inputStream.close();
+    return new ByteArrayClassLoader().defineClass(fqcn, bytes);
+  }
+
+  /**
+   * Given the class name it reads and loads the class from given byte array.
+   *
+   * @param fqcn       fully qualified class name.
+   * @param inputClass byte[] from which class is read.
+   * @return loaded class
+   * @throws IOException
+   */
+  public static Class<?> readBeanClass(String fqcn, byte[] inputClass) throws IOException
+  {
+    return new ByteArrayClassLoader().defineClass(fqcn, inputClass);
+  }
+
+  /**
+   * Byte Array class loader for loading class from byte[]
+   */
+  public static class ByteArrayClassLoader extends ClassLoader
+  {
+    Class<?> defineClass(String name, byte[] ba)
+    {
+      return defineClass(name, ba, 0, ba.length);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c92ca15e/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 7a392d0..da7c38b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -204,6 +204,7 @@
         <module>benchmark</module>
         <module>apps</module>
         <module>samples</module>
+        <module>sql</module>
       </modules>
     </profile>
   </profiles>

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c92ca15e/sql/pom.xml
----------------------------------------------------------------------
diff --git a/sql/pom.xml b/sql/pom.xml
new file mode 100644
index 0000000..24ef44c
--- /dev/null
+++ b/sql/pom.xml
@@ -0,0 +1,213 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.apex</groupId>
+    <artifactId>malhar</artifactId>
+    <version>3.6.0-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>malhar-sql</artifactId>
+  <name>Apache Apex Malhar SQL Support</name>
+  <packaging>jar</packaging>
+
+  <build>
+    <plugins>
+      <!-- Publish tests jar -->
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+        <version>2.4</version>
+        <executions>
+          <execution>
+            <goals>
+              <goal>test-jar</goal>
+            </goals>
+            <phase>package</phase>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.apex</groupId>
+      <artifactId>apex-common</artifactId>
+      <version>${apex.core.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.apex</groupId>
+      <artifactId>malhar-library</artifactId>
+      <version>${project.parent.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>*</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.apex</groupId>
+      <artifactId>malhar-contrib</artifactId>
+      <version>${project.parent.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>*</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.apex</groupId>
+      <artifactId>malhar-kafka</artifactId>
+      <version>${project.parent.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>*</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.calcite</groupId>
+      <artifactId>calcite-core</artifactId>
+      <version>1.7.0</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.apache.calcite.avatica</groupId>
+          <artifactId>avatica-metrics</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.fasterxml.jackson.core</groupId>
+          <artifactId>jackson-core</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.fasterxml.jackson.core</groupId>
+          <artifactId>jackson-annotations</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.fasterxml.jackson.core</groupId>
+          <artifactId>jackson-databind</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.google.protobuf</groupId>
+          <artifactId>protobuf-java</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.httpcomponents</groupId>
+          <artifactId>httpclient</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.httpcomponents</groupId>
+          <artifactId>httpcore</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>commons-dbcp</groupId>
+          <artifactId>commons-dbcp</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.pentaho</groupId>
+          <artifactId>pentaho-aggdesigner-algorithm</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <!-- For BeanClassGenerator -->
+    <!-- use shaded asm library to avoid conflict -->
+    <dependency>
+      <groupId>org.apache.xbean</groupId>
+      <artifactId>xbean-asm5-shaded</artifactId>
+      <version>4.3</version>
+      <scope>provided</scope>
+    </dependency>
+
+    <!-- For CSV Parser -->
+    <dependency>
+      <groupId>net.sf.supercsv</groupId>
+      <artifactId>super-csv</artifactId>
+      <version>2.4.0</version>
+    </dependency>
+    <dependency>
+      <groupId>com.github.fge</groupId>
+      <artifactId>json-schema-validator</artifactId>
+      <version>2.0.1</version>
+    </dependency>
+
+    <!-- Kafka Dependency -->
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka_2.11</artifactId>
+      <version>0.9.0.0</version>
+      <optional>true</optional>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-simple</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>log4j</groupId>
+          <artifactId>log4j</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.zookeeper</groupId>
+          <artifactId>zookeeper</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka-clients</artifactId>
+      <version>0.9.0.0</version>
+    </dependency>
+
+    <!-- For KafkaTest -->
+    <dependency>
+      <groupId>org.apache.apex</groupId>
+      <artifactId>malhar-kafka</artifactId>
+      <version>${project.parent.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>*</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka_2.11</artifactId>
+      <version>0.9.0.0</version>
+      <classifier>test</classifier>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c92ca15e/sql/src/main/java/org/apache/apex/malhar/sql/SQLExecEnvironment.java
----------------------------------------------------------------------
diff --git a/sql/src/main/java/org/apache/apex/malhar/sql/SQLExecEnvironment.java b/sql/src/main/java/org/apache/apex/malhar/sql/SQLExecEnvironment.java
new file mode 100644
index 0000000..de146f6
--- /dev/null
+++ b/sql/src/main/java/org/apache/apex/malhar/sql/SQLExecEnvironment.java
@@ -0,0 +1,262 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.sql;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.sql.planner.RelNodeVisitor;
+import org.apache.apex.malhar.sql.schema.ApexSQLTable;
+import org.apache.apex.malhar.sql.table.Endpoint;
+import org.apache.calcite.config.Lex;
+import org.apache.calcite.jdbc.CalciteConnection;
+import org.apache.calcite.jdbc.CalciteSchema;
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.prepare.CalciteCatalogReader;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.calcite.schema.Function;
+import org.apache.calcite.schema.ScalarFunction;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.Table;
+import org.apache.calcite.schema.impl.ScalarFunctionImpl;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperatorTable;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.parser.SqlParser;
+import org.apache.calcite.sql.util.ChainedSqlOperatorTable;
+import org.apache.calcite.tools.FrameworkConfig;
+import org.apache.calcite.tools.Frameworks;
+import org.apache.calcite.tools.Planner;
+import org.apache.calcite.util.Util;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+
+import com.datatorrent.api.DAG;
+
+/**
+ * SQL Execution Environment for SQL integration API of Apex.
+ * This exposes calcite functionality in simple way. Most of the APIs are with builder pattern which makes it
+ * easier to construct a DAG using this object.
+ *
+ * Eg.
+ * <pre>
+ * SQLExecEnvironment.getEnvironment(dag)
+ *                   .registerTable("TABLE1", Object of type {@link Endpoint})
+ *                   .registerTable("TABLE2", Object of type {@link Endpoint})
+ *                   .executeSQL("INSERT INTO TABLE2 SELECT STREAM * FROM TABLE1);
+ * </pre>
+ *
+ * Above code will evaluate SQL statement and convert the resultant Relational Algebra to a sub-DAG.
+ */
+@InterfaceStability.Evolving
+public class SQLExecEnvironment
+{
+  private static final Logger logger = LoggerFactory.getLogger(SQLExecEnvironment.class);
+
+  private final JavaTypeFactoryImpl typeFactory;
+  private SchemaPlus schema = Frameworks.createRootSchema(true);
+
+  /**
+   * Construct SQL Execution Environment which works on given DAG objec.
+   */
+  private SQLExecEnvironment()
+  {
+    this.typeFactory = new JavaTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
+  }
+
+  /**
+   * Given SQLExec{@link SQLExecEnvironment} object for given {@link DAG}.
+   *
+   * @return Returns {@link SQLExecEnvironment} object
+   */
+  public static SQLExecEnvironment getEnvironment()
+  {
+    return new SQLExecEnvironment();
+  }
+
+  /**
+   * Use given model file to initialize {@link SQLExecEnvironment}.
+   * The model file contains definitions of endpoints and data formats.
+   * Example of file format is like following:
+   * <pre>
+   *   {
+   *     version: '1.0',
+   *     defaultSchema: 'APEX',
+   *     schemas: [{
+   *       name: 'APEX',
+   *       tables: [
+   *         {
+   *            name: 'ORDERS',
+   *            type: 'custom',
+   *            factory: 'org.apache.apex.malhar.sql.schema.ApexSQLTableFactory',
+   *            stream: {
+   *              stream: true
+   *            },
+   *            operand: {
+   *              endpoint: 'file',
+   *              messageFormat: 'csv',
+   *              endpointOperands: {
+   *                directory: '/tmp/input'
+   *              },
+   *              messageFormatOperands: {
+   *                schema: '{"separator":",","quoteChar":"\\"","fields":[{"name":"RowTime","type":"Date","constraints":{"format":"dd/MM/yyyy hh:mm:ss"}},{"name":"id","type":"Integer"},{"name":"Product","type":"String"},{"name":"units","type":"Integer"}]}'
+   *            }
+   *            }
+   *         }
+   *       ]
+   *     }]
+   *   }
+   * </pre>
+   *
+   * @param model String content of model file.
+   * @return Returns this {@link SQLExecEnvironment}
+   */
+  public SQLExecEnvironment withModel(String model)
+  {
+    if (model == null) {
+      return this;
+    }
+
+    Properties p = new Properties();
+    p.put("model", "inline:" + model);
+    try (Connection connection = DriverManager.getConnection("jdbc:calcite:", p)) {
+      CalciteConnection conn = connection.unwrap(CalciteConnection.class);
+      this.schema = conn.getRootSchema().getSubSchema(connection.getSchema());
+    } catch (SQLException e) {
+      throw new RuntimeException(e);
+    }
+
+    return this;
+  }
+
+  /**
+   * Register a table using {@link Endpoint} with this {@link SQLExecEnvironment}
+   *
+   * @param name Name of the table that needs to be present with SQL Statement.
+   * @param endpoint Object of type {@link Endpoint}
+   *
+   * @return Returns this {@link SQLExecEnvironment}
+   */
+  public SQLExecEnvironment registerTable(String name, Endpoint endpoint)
+  {
+    Preconditions.checkNotNull(name, "Table name cannot be null");
+    registerTable(name, new ApexSQLTable(schema, name, endpoint));
+    return this;
+  }
+
+  /**
+   * Register a table using {@link Table} with this {@link SQLExecEnvironment}
+   *
+   * @param name Name of the table that needs to be present with SQL statement
+   * @param table Object of type {@link Table}
+   *
+   * @return Returns this {@link SQLExecEnvironment}
+   */
+  public SQLExecEnvironment registerTable(String name, Table table)
+  {
+    Preconditions.checkNotNull(name, "Table name cannot be null");
+    schema.add(name, table);
+    return this;
+  }
+
+  /**
+   * Register custom function with this {@link SQLExecEnvironment}
+   *
+   * @param name Name of the scalar SQL function that needs made available to SQL Statement
+   * @param fn Object of type {@link Function}
+   *
+   * @return Returns this {@link SQLExecEnvironment}
+   */
+  public SQLExecEnvironment registerFunction(String name, Function fn)
+  {
+    Preconditions.checkNotNull(name, "Function name cannot be null");
+    schema.add(name, fn);
+    return this;
+  }
+
+  /**
+   * Register custom function from given static method with this {@link SQLExecEnvironment}
+   *
+   * @param name Name of the scalar SQL function that needs make available to SQL Statement
+   * @param clazz {@link Class} which contains given static method
+   * @param methodName Name of the method from given clazz
+   *
+   * @return Return this {@link SQLExecEnvironment}
+   */
+  public SQLExecEnvironment registerFunction(String name, Class clazz, String methodName)
+  {
+    Preconditions.checkNotNull(name, "Function name cannot be null");
+    ScalarFunction scalarFunction = ScalarFunctionImpl.create(clazz, methodName);
+    return registerFunction(name, scalarFunction);
+  }
+
+  /**
+   * This is the main method takes SQL statement as input and contructs a DAG using contructs registered with this
+   * {@link SQLExecEnvironment}.
+   *
+   * @param sql SQL statement that should be converted to a DAG.
+   */
+  public void executeSQL(DAG dag, String sql)
+  {
+    FrameworkConfig config = buildFrameWorkConfig();
+    Planner planner = Frameworks.getPlanner(config);
+    try {
+      logger.info("Parsing SQL statement: {}", sql);
+      SqlNode parsedTree = planner.parse(sql);
+      SqlNode validatedTree = planner.validate(parsedTree);
+      RelNode relationalTree = planner.rel(validatedTree).rel;
+      logger.info("RelNode relationalTree generate from SQL statement is:\n {}",
+          Util.toLinux(RelOptUtil.toString(relationalTree)));
+      RelNodeVisitor visitor = new RelNodeVisitor(dag, typeFactory);
+      visitor.traverse(relationalTree);
+    } catch (Exception e) {
+      throw Throwables.propagate(e);
+    } finally {
+      planner.close();
+    }
+  }
+
+  /**
+   * Method method build a calcite framework configuration for calcite to parse SQL and generate relational tree
+   * out of it.
+   * @return FrameworkConfig
+   */
+  private FrameworkConfig buildFrameWorkConfig()
+  {
+    List<SqlOperatorTable> sqlOperatorTables = new ArrayList<>();
+    sqlOperatorTables.add(SqlStdOperatorTable.instance());
+    sqlOperatorTables
+      .add(new CalciteCatalogReader(CalciteSchema.from(schema), false, Collections.<String>emptyList(), typeFactory));
+    return Frameworks.newConfigBuilder().defaultSchema(schema)
+      .parserConfig(SqlParser.configBuilder().setLex(Lex.MYSQL).build())
+      .operatorTable(new ChainedSqlOperatorTable(sqlOperatorTables)).build();
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c92ca15e/sql/src/main/java/org/apache/apex/malhar/sql/codegen/BeanClassGenerator.java
----------------------------------------------------------------------
diff --git a/sql/src/main/java/org/apache/apex/malhar/sql/codegen/BeanClassGenerator.java b/sql/src/main/java/org/apache/apex/malhar/sql/codegen/BeanClassGenerator.java
index ffeafb5..3e384a2 100644
--- a/sql/src/main/java/org/apache/apex/malhar/sql/codegen/BeanClassGenerator.java
+++ b/sql/src/main/java/org/apache/apex/malhar/sql/codegen/BeanClassGenerator.java
@@ -25,9 +25,7 @@ import java.util.Map;
 import org.codehaus.jettison.json.JSONException;
 
 import org.apache.apex.malhar.sql.schema.TupleSchemaRegistry;
-import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.xbean.asm5.ClassWriter;
 import org.apache.xbean.asm5.Opcodes;
@@ -55,16 +53,26 @@ public class BeanClassGenerator
 {
   public static final ImmutableMap<String, Character> PRIMITIVE_TYPES;
 
+  public static final char typeIdentifierBoolean = 'Z';
+  public static final char typeIdentifierChar = 'C';
+  public static final char typeIdentifierByte = 'B';
+  public static final char typeIdentifierShort = 'S';
+  public static final char typeIdentifierInt = 'I';
+  public static final char typeIdentifierFloat = 'F';
+  public static final char typeIdentifierLong = 'J';
+  public static final char typeIdentifierDouble = 'D';
+
+
   static {
     Map<String, Character> types = Maps.newHashMap();
-    types.put("boolean", 'Z');
-    types.put("char", 'C');
-    types.put("byte", 'B');
-    types.put("short", 'S');
-    types.put("int", 'I');
-    types.put("float", 'F');
-    types.put("long", 'J');
-    types.put("double", 'D');
+    types.put("boolean", typeIdentifierBoolean);
+    types.put("char", typeIdentifierChar);
+    types.put("byte", typeIdentifierByte);
+    types.put("short", typeIdentifierShort);
+    types.put("int", typeIdentifierInt);
+    types.put("float", typeIdentifierFloat);
+    types.put("long", typeIdentifierLong);
+    types.put("double", typeIdentifierDouble);
     PRIMITIVE_TYPES = ImmutableMap.copyOf(types);
   }
 
@@ -100,7 +108,7 @@ public class BeanClassGenerator
   {
     ClassNode classNode = new ClassNode();
 
-    classNode.version = Opcodes.V1_6;  //generated class will only run on JRE 1.6 or above
+    classNode.version = Opcodes.V1_7;  //generated class will only run on JRE 1.7 or above
     classNode.access = Opcodes.ACC_PUBLIC;
 
     classNode.name = fqcn.replace('.', '/');
@@ -116,38 +124,15 @@ public class BeanClassGenerator
       String fieldType = fieldInfo.getType().getJavaType().getName();
       String fieldJavaType = getJavaType(fieldType);
 
-      // Add private field
-      FieldNode fieldNode = new FieldNode(Opcodes.ACC_PRIVATE, fieldName, fieldJavaType, null, null);
-      classNode.fields.add(fieldNode);
+      addPrivateField(classNode, fieldName, fieldJavaType);
 
       String fieldNameForMethods = Character.toUpperCase(fieldName.charAt(0)) + fieldName.substring(1);
 
-      switch (fieldType) {
-        case "boolean":
-          addIntGetterNSetter(classNode, fieldName, fieldNameForMethods, fieldJavaType, true);
-          break;
-        case "byte":
-        case "char":
-        case "short":
-        case "int":
-          addIntGetterNSetter(classNode, fieldName, fieldNameForMethods, fieldJavaType, false);
-          break;
-        case "long":
-          addLongGetterNSetter(classNode, fieldName, fieldNameForMethods, fieldJavaType);
-          break;
-        case "float":
-          addFloatGetterNSetter(classNode, fieldName, fieldNameForMethods, fieldJavaType);
-          break;
-        case "double":
-          addDoubleGetterNSetter(classNode, fieldName, fieldNameForMethods, fieldJavaType);
-          break;
-        default:
-          if (fieldJavaType.equals(getJavaType("java.util.Date"))) {
-            addDateFields(classNode, fieldName, fieldNameForMethods, "java/util/Date");
-          } else {
-            addObjectGetterNSetter(classNode, fieldName, fieldNameForMethods, fieldJavaType);
-          }
-          break;
+      if (fieldJavaType.equals(getJavaType("java.util.Date"))) {
+        addDateFields(classNode, fieldName, fieldNameForMethods, "java/util/Date");
+      } else {
+        addGetter(classNode, fieldName, fieldNameForMethods, fieldJavaType);
+        addSetter(classNode, fieldName, fieldNameForMethods, fieldJavaType);
       }
     }
 
@@ -170,6 +155,11 @@ public class BeanClassGenerator
     return classBytes;
   }
 
+  /**
+   * Add Default constructor for POJO
+   * @param classNode ClassNode which needs to be populated with constructor
+   */
+  @SuppressWarnings("unchecked")
   private static void addDefaultConstructor(ClassNode classNode)
   {
     MethodNode constructorNode = new MethodNode(Opcodes.ACC_PUBLIC, "<init>", "()V", null, null);
@@ -181,6 +171,95 @@ public class BeanClassGenerator
   }
 
   /**
+   * Add private field to the class
+   * @param classNode ClassNode which needs to be populated with private field.
+   * @param fieldName Name of the field
+   * @param fieldJavaType Java ASM type of the field
+   */
+  @SuppressWarnings("unchecked")
+  private static void addPrivateField(ClassNode classNode, String fieldName, String fieldJavaType)
+  {
+    FieldNode fieldNode = new FieldNode(Opcodes.ACC_PRIVATE, fieldName, fieldJavaType, null, null);
+    classNode.fields.add(fieldNode);
+  }
+
+  /**
+   * Add public getter method for given field
+   * @param classNode ClassNode which needs to be populated with public getter.
+   * @param fieldName Name of the field for which public getter needs to be added.
+   * @param fieldNameForMethods Suffix of the getter method, Prefix "is" or "get" is added by this method.
+   * @param fieldJavaType Java ASM type of the field
+   */
+  @SuppressWarnings("unchecked")
+  private static void addGetter(ClassNode classNode, String fieldName, String fieldNameForMethods, String fieldJavaType)
+  {
+    String getterSignature = "()" + fieldJavaType;
+    MethodNode getterNode = new MethodNode(Opcodes.ACC_PUBLIC,
+        (fieldJavaType.equals(typeIdentifierBoolean) ? "is" : "get") + fieldNameForMethods,
+        getterSignature, null, null);
+    getterNode.instructions.add(new VarInsnNode(Opcodes.ALOAD, 0));
+    getterNode.instructions.add(new FieldInsnNode(Opcodes.GETFIELD, classNode.name, fieldName, fieldJavaType));
+
+    int returnOpCode;
+    if (fieldJavaType.equals(Character.toString(typeIdentifierBoolean)) ||
+        fieldJavaType.equals(Character.toString(typeIdentifierByte))    ||
+        fieldJavaType.equals(Character.toString(typeIdentifierChar))    ||
+        fieldJavaType.equals(Character.toString(typeIdentifierShort))   ||
+        fieldJavaType.equals(Character.toString(typeIdentifierInt))) {
+      returnOpCode = Opcodes.IRETURN;
+    } else if (fieldJavaType.equals(Character.toString(typeIdentifierLong))) {
+      returnOpCode = Opcodes.LRETURN;
+    } else if (fieldJavaType.equals(Character.toString(typeIdentifierFloat))) {
+      returnOpCode = Opcodes.FRETURN;
+    } else if (fieldJavaType.equals(Character.toString(typeIdentifierDouble))) {
+      returnOpCode = Opcodes.DRETURN;
+    } else {
+      returnOpCode = Opcodes.ARETURN;
+    }
+    getterNode.instructions.add(new InsnNode(returnOpCode));
+
+    classNode.methods.add(getterNode);
+  }
+
+  /**
+   * Add public setter for given field
+   * @param classNode ClassNode which needs to be populated with public setter
+   * @param fieldName Name of the field for which setter needs to be added
+   * @param fieldNameForMethods Suffix for setter method. Prefix "set" is added by this method
+   * @param fieldJavaType Java ASM type of the field
+   */
+  @SuppressWarnings("unchecked")
+  private static void addSetter(ClassNode classNode, String fieldName, String fieldNameForMethods, String fieldJavaType)
+  {
+    String setterSignature = '(' + fieldJavaType + ')' + 'V';
+    MethodNode setterNode = new MethodNode(Opcodes.ACC_PUBLIC, "set" + fieldNameForMethods, setterSignature, null,
+        null);
+    setterNode.instructions.add(new VarInsnNode(Opcodes.ALOAD, 0));
+
+    int loadOpCode;
+    if (fieldJavaType.equals(Character.toString(typeIdentifierBoolean)) ||
+        fieldJavaType.equals(Character.toString(typeIdentifierByte))    ||
+        fieldJavaType.equals(Character.toString(typeIdentifierChar))    ||
+        fieldJavaType.equals(Character.toString(typeIdentifierShort))   ||
+        fieldJavaType.equals(Character.toString(typeIdentifierInt))) {
+      loadOpCode = Opcodes.ILOAD;
+    } else if (fieldJavaType.equals(Character.toString(typeIdentifierLong))) {
+      loadOpCode = Opcodes.LLOAD;
+    } else if (fieldJavaType.equals(Character.toString(typeIdentifierFloat))) {
+      loadOpCode = Opcodes.FLOAD;
+    } else if (fieldJavaType.equals(Character.toString(typeIdentifierDouble))) {
+      loadOpCode = Opcodes.DLOAD;
+    } else {
+      loadOpCode = Opcodes.ALOAD;
+    }
+    setterNode.instructions.add(new VarInsnNode(loadOpCode, 1));
+
+    setterNode.instructions.add(new FieldInsnNode(Opcodes.PUTFIELD, classNode.name, fieldName, fieldJavaType));
+    setterNode.instructions.add(new InsnNode(Opcodes.RETURN));
+    classNode.methods.add(setterNode);
+  }
+
+  /**
    * Date field is explicitly handled and provided with 3 variants of types of same data.
    * 1. java.util.Date format
    * 2. long - Epoc time in ms
@@ -327,126 +406,6 @@ public class BeanClassGenerator
     classNode.methods.add(setterNodeMs);
   }
 
-  @SuppressWarnings("unchecked")
-  private static void addIntGetterNSetter(ClassNode classNode, String fieldName, String fieldNameForMethods,
-      String fieldJavaType, boolean isBoolean)
-  {
-    // Create getter
-    String getterSignature = "()" + fieldJavaType;
-    MethodNode getterNode = new MethodNode(Opcodes.ACC_PUBLIC, (isBoolean ? "is" : "get") + fieldNameForMethods,
-        getterSignature, null, null);
-    getterNode.instructions.add(new VarInsnNode(Opcodes.ALOAD, 0));
-    getterNode.instructions.add(new FieldInsnNode(Opcodes.GETFIELD, classNode.name, fieldName, fieldJavaType));
-    getterNode.instructions.add(new InsnNode(Opcodes.IRETURN));
-    classNode.methods.add(getterNode);
-
-    // Create setter
-    String setterSignature = '(' + fieldJavaType + ')' + 'V';
-    MethodNode setterNode = new MethodNode(Opcodes.ACC_PUBLIC, "set" + fieldNameForMethods, setterSignature, null,
-        null);
-    setterNode.instructions.add(new VarInsnNode(Opcodes.ALOAD, 0));
-    setterNode.instructions.add(new VarInsnNode(Opcodes.ILOAD, 1));
-    setterNode.instructions.add(new FieldInsnNode(Opcodes.PUTFIELD, classNode.name, fieldName, fieldJavaType));
-    setterNode.instructions.add(new InsnNode(Opcodes.RETURN));
-    classNode.methods.add(setterNode);
-  }
-
-  @SuppressWarnings("unchecked")
-  private static void addLongGetterNSetter(ClassNode classNode, String fieldName, String fieldNameForMethods,
-      String fieldJavaType)
-  {
-    // Create getter
-    String getterSignature = "()" + fieldJavaType;
-    MethodNode getterNode = new MethodNode(Opcodes.ACC_PUBLIC, "get" + fieldNameForMethods, getterSignature, null,
-        null);
-    getterNode.instructions.add(new VarInsnNode(Opcodes.ALOAD, 0));
-    getterNode.instructions.add(new FieldInsnNode(Opcodes.GETFIELD, classNode.name, fieldName, fieldJavaType));
-    getterNode.instructions.add(new InsnNode(Opcodes.LRETURN));
-    classNode.methods.add(getterNode);
-
-    // Create setter
-    String setterSignature = '(' + fieldJavaType + ')' + 'V';
-    MethodNode setterNode = new MethodNode(Opcodes.ACC_PUBLIC, "set" + fieldNameForMethods, setterSignature, null,
-        null);
-    setterNode.instructions.add(new VarInsnNode(Opcodes.ALOAD, 0));
-    setterNode.instructions.add(new VarInsnNode(Opcodes.LLOAD, 1));
-    setterNode.instructions.add(new FieldInsnNode(Opcodes.PUTFIELD, classNode.name, fieldName, fieldJavaType));
-    setterNode.instructions.add(new InsnNode(Opcodes.RETURN));
-    classNode.methods.add(setterNode);
-  }
-
-  @SuppressWarnings("unchecked")
-  private static void addFloatGetterNSetter(ClassNode classNode, String fieldName, String fieldNameForMethods,
-      String fieldJavaType)
-  {
-    // Create getter
-    String getterSignature = "()" + fieldJavaType;
-    MethodNode getterNode = new MethodNode(Opcodes.ACC_PUBLIC, "get" + fieldNameForMethods, getterSignature, null,
-        null);
-    getterNode.instructions.add(new VarInsnNode(Opcodes.ALOAD, 0));
-    getterNode.instructions.add(new FieldInsnNode(Opcodes.GETFIELD, classNode.name, fieldName, fieldJavaType));
-    getterNode.instructions.add(new InsnNode(Opcodes.FRETURN));
-    classNode.methods.add(getterNode);
-
-    // Create setter
-    String setterSignature = '(' + fieldJavaType + ')' + 'V';
-    MethodNode setterNode = new MethodNode(Opcodes.ACC_PUBLIC, "set" + fieldNameForMethods, setterSignature, null,
-        null);
-    setterNode.instructions.add(new VarInsnNode(Opcodes.ALOAD, 0));
-    setterNode.instructions.add(new VarInsnNode(Opcodes.FLOAD, 1));
-    setterNode.instructions.add(new FieldInsnNode(Opcodes.PUTFIELD, classNode.name, fieldName, fieldJavaType));
-    setterNode.instructions.add(new InsnNode(Opcodes.RETURN));
-    classNode.methods.add(setterNode);
-  }
-
-  @SuppressWarnings("unchecked")
-  private static void addDoubleGetterNSetter(ClassNode classNode, String fieldName, String fieldNameForMethods,
-      String fieldJavaType)
-  {
-    // Create getter
-    String getterSignature = "()" + fieldJavaType;
-    MethodNode getterNode = new MethodNode(Opcodes.ACC_PUBLIC, "get" + fieldNameForMethods, getterSignature, null,
-        null);
-    getterNode.instructions.add(new VarInsnNode(Opcodes.ALOAD, 0));
-    getterNode.instructions.add(new FieldInsnNode(Opcodes.GETFIELD, classNode.name, fieldName, fieldJavaType));
-    getterNode.instructions.add(new InsnNode(Opcodes.DRETURN));
-    classNode.methods.add(getterNode);
-
-    // Create setter
-    String setterSignature = '(' + fieldJavaType + ')' + 'V';
-    MethodNode setterNode = new MethodNode(Opcodes.ACC_PUBLIC, "set" + fieldNameForMethods, setterSignature, null,
-        null);
-    setterNode.instructions.add(new VarInsnNode(Opcodes.ALOAD, 0));
-    setterNode.instructions.add(new VarInsnNode(Opcodes.DLOAD, 1));
-    setterNode.instructions.add(new FieldInsnNode(Opcodes.PUTFIELD, classNode.name, fieldName, fieldJavaType));
-    setterNode.instructions.add(new InsnNode(Opcodes.RETURN));
-    classNode.methods.add(setterNode);
-  }
-
-  @SuppressWarnings("unchecked")
-  private static void addObjectGetterNSetter(ClassNode classNode, String fieldName, String fieldNameForMethods,
-      String fieldJavaType)
-  {
-    // Create getter
-    String getterSignature = "()" + fieldJavaType;
-    MethodNode getterNode = new MethodNode(Opcodes.ACC_PUBLIC, "get" + fieldNameForMethods, getterSignature, null,
-        null);
-    getterNode.instructions.add(new VarInsnNode(Opcodes.ALOAD, 0));
-    getterNode.instructions.add(new FieldInsnNode(Opcodes.GETFIELD, classNode.name, fieldName, fieldJavaType));
-    getterNode.instructions.add(new InsnNode(Opcodes.ARETURN));
-    classNode.methods.add(getterNode);
-
-    // Create setter
-    String setterSignature = '(' + fieldJavaType + ')' + 'V';
-    MethodNode setterNode = new MethodNode(Opcodes.ACC_PUBLIC, "set" + fieldNameForMethods, setterSignature, null,
-        null);
-    setterNode.instructions.add(new VarInsnNode(Opcodes.ALOAD, 0));
-    setterNode.instructions.add(new VarInsnNode(Opcodes.ALOAD, 1));
-    setterNode.instructions.add(new FieldInsnNode(Opcodes.PUTFIELD, classNode.name, fieldName, fieldJavaType));
-    setterNode.instructions.add(new InsnNode(Opcodes.RETURN));
-    classNode.methods.add(setterNode);
-  }
-
   /**
    * Adds a toString method to underlying class. Uses StringBuilder to generate the final string.
    *
@@ -488,7 +447,8 @@ public class BeanClassGenerator
       toStringNode.instructions.add(new FieldInsnNode(Opcodes.GETFIELD, classNode.name, fieldName, fieldJavaType));
 
       // There is no StringBuilder.append method for short and byte. It takes it as int.
-      if (fieldJavaType.equals("S") || fieldJavaType.equals("B")) {
+      if (fieldJavaType.equals(Character.toString(typeIdentifierShort)) ||
+          fieldJavaType.equals(Character.toString(typeIdentifierByte))) {
         fieldJavaType = "I";
       }
 
@@ -757,40 +717,4 @@ public class BeanClassGenerator
     //non-primitive so find the internal name of the class.
     return 'L' + fieldType.replace('.', '/') + ';';
   }
-
-  /**
-   * Given the class name it reads and loads the class from the input stream.
-   *
-   * @param fqcn        fully qualified class name.
-   * @param inputStream stream from which class is read.
-   * @return loaded class
-   * @throws IOException
-   */
-  public static Class<?> readBeanClass(String fqcn, FSDataInputStream inputStream) throws IOException
-  {
-    byte[] bytes = IOUtils.toByteArray(inputStream);
-    inputStream.close();
-    return new ByteArrayClassLoader().defineClass(fqcn, bytes);
-  }
-
-  /**
-   * Given the class name it reads and loads the class from given byte array.
-   *
-   * @param fqcn       fully qualified class name.
-   * @param inputClass byte[] from which class is read.
-   * @return loaded class
-   * @throws IOException
-   */
-  public static Class<?> readBeanClass(String fqcn, byte[] inputClass) throws IOException
-  {
-    return new ByteArrayClassLoader().defineClass(fqcn, inputClass);
-  }
-
-  private static class ByteArrayClassLoader extends ClassLoader
-  {
-    Class<?> defineClass(String name, byte[] ba)
-    {
-      return defineClass(name, ba, 0, ba.length);
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c92ca15e/sql/src/main/java/org/apache/apex/malhar/sql/codegen/ExpressionCompiler.java
----------------------------------------------------------------------
diff --git a/sql/src/main/java/org/apache/apex/malhar/sql/codegen/ExpressionCompiler.java b/sql/src/main/java/org/apache/apex/malhar/sql/codegen/ExpressionCompiler.java
new file mode 100644
index 0000000..e12ff53
--- /dev/null
+++ b/sql/src/main/java/org/apache/apex/malhar/sql/codegen/ExpressionCompiler.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.sql.codegen;
+
+import java.util.List;
+import java.util.regex.Matcher;
+
+import org.apache.apex.malhar.sql.operators.OperatorUtils;
+import org.apache.calcite.adapter.enumerable.JavaRowFormat;
+import org.apache.calcite.adapter.enumerable.PhysType;
+import org.apache.calcite.adapter.enumerable.PhysTypeImpl;
+import org.apache.calcite.adapter.enumerable.RexToLixTranslator;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.linq4j.function.Function1;
+import org.apache.calcite.linq4j.tree.BlockBuilder;
+import org.apache.calcite.linq4j.tree.BlockStatement;
+import org.apache.calcite.linq4j.tree.Expression;
+import org.apache.calcite.linq4j.tree.Expressions;
+import org.apache.calcite.linq4j.tree.Statement;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexProgram;
+import org.apache.calcite.rex.RexProgramBuilder;
+import org.apache.calcite.util.Pair;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.google.common.collect.ImmutableList;
+
+/**
+ * Converts calcite expression of type {@link RexNode} to quasi-Java expression which can be used
+ * with {@link com.datatorrent.lib.util.PojoUtils}
+ */
+@InterfaceStability.Evolving
+public class ExpressionCompiler
+{
+  private final RexBuilder rexBuilder;
+
+  public ExpressionCompiler(RexBuilder rexBuilder)
+  {
+    this.rexBuilder = rexBuilder;
+  }
+
+  /**
+   * Create quasi-Java expression from given {@link RexNode}
+   *
+   * @param node Expression in the form of {@link RexNode}
+   * @param inputRowType Input Data type to expression in the form of {@link RelDataType}
+   * @param outputRowType Output data type of expression in the form of {@link RelDataType}
+   *
+   * @return Returns quasi-Java expression
+   */
+  public String getExpression(RexNode node, RelDataType inputRowType, RelDataType outputRowType)
+  {
+    final RexProgramBuilder programBuilder = new RexProgramBuilder(inputRowType, rexBuilder);
+    programBuilder.addProject(node, null);
+    final RexProgram program = programBuilder.getProgram();
+
+    final BlockBuilder builder = new BlockBuilder();
+    final JavaTypeFactory javaTypeFactory = (JavaTypeFactory)rexBuilder.getTypeFactory();
+
+    final RexToLixTranslator.InputGetter inputGetter = new RexToLixTranslator.InputGetterImpl(ImmutableList
+        .of(Pair.<Expression, PhysType>of(Expressions.variable(Object[].class, "inputValues"),
+        PhysTypeImpl.of(javaTypeFactory, inputRowType, JavaRowFormat.ARRAY, false))));
+    final Function1<String, RexToLixTranslator.InputGetter> correlates =
+        new Function1<String, RexToLixTranslator.InputGetter>()
+      {
+        public RexToLixTranslator.InputGetter apply(String a0)
+        {
+          throw new UnsupportedOperationException();
+        }
+      };
+
+    final List<Expression> list = RexToLixTranslator.translateProjects(program, javaTypeFactory, builder,
+        PhysTypeImpl.of(javaTypeFactory, outputRowType, JavaRowFormat.ARRAY, false), null, inputGetter, correlates);
+
+    for (int i = 0; i < list.size(); i++) {
+      Statement statement = Expressions.statement(list.get(i));
+      builder.add(statement);
+    }
+
+    return finalizeExpression(builder.toBlock(), inputRowType);
+  }
+
+  private String finalizeExpression(BlockStatement blockStatement, RelDataType inputRowType)
+  {
+    String s = Expressions.toString(blockStatement.statements.get(0));
+    int idx = 0;
+    for (RelDataTypeField field : inputRowType.getFieldList()) {
+      String fieldName = OperatorUtils.getFieldName(field);
+      s = s.replaceAll(String.format("inputValues\\[%d\\]", idx++), "\\{\\$." + Matcher.quoteReplacement(fieldName) + "\\}");
+    }
+
+    return s.substring(0, s.length() - 2);
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c92ca15e/sql/src/main/java/org/apache/apex/malhar/sql/operators/FilterTransformOperator.java
----------------------------------------------------------------------
diff --git a/sql/src/main/java/org/apache/apex/malhar/sql/operators/FilterTransformOperator.java b/sql/src/main/java/org/apache/apex/malhar/sql/operators/FilterTransformOperator.java
new file mode 100644
index 0000000..192c58b
--- /dev/null
+++ b/sql/src/main/java/org/apache/apex/malhar/sql/operators/FilterTransformOperator.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.sql.operators;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.lib.expression.Expression;
+import com.datatorrent.lib.transform.TransformOperator;
+import com.datatorrent.lib.util.PojoUtils;
+
+/**
+ * This is an extension of {@link TransformOperator} which also takes care of filtering tuples.
+ */
+@InterfaceStability.Evolving
+public class FilterTransformOperator extends TransformOperator
+{
+  private String condition;
+  private transient Expression conditionExpression;
+
+  @Override
+  public void activate(Context context)
+  {
+    super.activate(context);
+    if (condition != null) {
+      conditionExpression = PojoUtils.createExpression(inputClass, condition, Boolean.class,
+        expressionFunctions.toArray(new String[expressionFunctions.size()]));
+    }
+  }
+
+  @Override
+  protected void processTuple(Object in)
+  {
+    if ((conditionExpression != null) && (conditionExpression.execute(in) == Boolean.FALSE)) {
+      return;
+    }
+
+    super.processTuple(in);
+  }
+
+  public String getCondition()
+  {
+    return condition;
+  }
+
+  /**
+   * Set quasi-Java expression which acts as filtering logic for given tuple
+   *
+   * @param condition Expression which can be evaluated for filtering
+   */
+  public void setCondition(String condition)
+  {
+    this.condition = condition;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c92ca15e/sql/src/main/java/org/apache/apex/malhar/sql/operators/InnerJoinOperator.java
----------------------------------------------------------------------
diff --git a/sql/src/main/java/org/apache/apex/malhar/sql/operators/InnerJoinOperator.java b/sql/src/main/java/org/apache/apex/malhar/sql/operators/InnerJoinOperator.java
new file mode 100644
index 0000000..0b21898
--- /dev/null
+++ b/sql/src/main/java/org/apache/apex/malhar/sql/operators/InnerJoinOperator.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.sql.operators;
+
+import org.apache.apex.malhar.lib.join.POJOInnerJoinOperator;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.datatorrent.api.Context;
+
+/**
+ * This is an extension of {@link POJOInnerJoinOperator} operator which works over a global scope and
+ * does not have time bound expiry of join tuples.
+ */
+@InterfaceStability.Evolving
+public class InnerJoinOperator extends POJOInnerJoinOperator
+{
+  private long time = System.currentTimeMillis();
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+    this.setExpiryTime(1L);
+    // Number of buckets is set to 47000 because this is rounded number closer to sqrt of MAXINT. This guarantees
+    // even distribution of keys across buckets.
+    this.setNoOfBuckets(47000);
+    this.setTimeFieldsStr("");
+    super.setup(context);
+  }
+
+  @Override
+  public long extractTime(Object tuple, boolean isStream1Data)
+  {
+    /**
+     * Return extract time which is always more than time when the operator is started.
+     */
+    return time + 3600000L;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c92ca15e/sql/src/main/java/org/apache/apex/malhar/sql/operators/LineReader.java
----------------------------------------------------------------------
diff --git a/sql/src/main/java/org/apache/apex/malhar/sql/operators/LineReader.java b/sql/src/main/java/org/apache/apex/malhar/sql/operators/LineReader.java
new file mode 100644
index 0000000..b0225f0
--- /dev/null
+++ b/sql/src/main/java/org/apache/apex/malhar/sql/operators/LineReader.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.sql.operators;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.Path;
+
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.lib.io.fs.AbstractFileInputOperator;
+
+/**
+ * This operator reads data from given file/folder in line by line fashion.
+ */
+@InterfaceStability.Evolving
+public class LineReader extends AbstractFileInputOperator<byte[]>
+{
+  public final transient DefaultOutputPort<byte[]> output = new DefaultOutputPort<>();
+
+  protected transient BufferedReader br;
+
+  @Override
+  protected InputStream openFile(Path path) throws IOException
+  {
+    InputStream is = super.openFile(path);
+    br = new BufferedReader(new InputStreamReader(is));
+    return is;
+  }
+
+  @Override
+  protected void closeFile(InputStream is) throws IOException
+  {
+    super.closeFile(is);
+    br.close();
+    br = null;
+  }
+
+  @Override
+  protected byte[] readEntity() throws IOException
+  {
+    String s = br.readLine();
+    if (s != null) {
+      return s.getBytes();
+    }
+    return null;
+  }
+
+  @Override
+  protected void emit(byte[] tuple)
+  {
+    output.emit(tuple);
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c92ca15e/sql/src/main/java/org/apache/apex/malhar/sql/operators/OperatorUtils.java
----------------------------------------------------------------------
diff --git a/sql/src/main/java/org/apache/apex/malhar/sql/operators/OperatorUtils.java b/sql/src/main/java/org/apache/apex/malhar/sql/operators/OperatorUtils.java
new file mode 100644
index 0000000..e5a1928
--- /dev/null
+++ b/sql/src/main/java/org/apache/apex/malhar/sql/operators/OperatorUtils.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.sql.operators;
+
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.hadoop.classification.InterfaceStability;
+
+@InterfaceStability.Evolving
+public class OperatorUtils
+{
+  private static int opCount = 1;
+  private static int streamCount = 1;
+
+  /**
+   * This method generates unique name for the operator.
+   *
+   * @param operatorType Base name of the operator.
+   * @return Returns unique name of the operator.
+   */
+  public static String getUniqueOperatorName(String operatorType)
+  {
+    return operatorType + "_" + opCount++;
+  }
+
+  /**
+   * This method generates unique name for the stream using input and output.
+   *
+   * @param output Name of the output end of the stream
+   * @param input Name of the input end of the stream
+   * @return Returns unique name of the stream.
+   */
+  public static String getUniqueStreamName(String output, String input)
+  {
+    return output + "_" + input + "_" + streamCount++;
+  }
+
+  /**
+   * This method gives field name for POJO class for given {@link RelDataTypeField} object.
+   *
+   * @param field field object that needs to be converted to POJO class field name
+   * @return Return field name from POJO class
+   */
+  public static String getFieldName(RelDataTypeField field)
+  {
+    SqlTypeName sqlTypeName = field.getType().getSqlTypeName();
+    String name = getValidFieldName(field);
+
+    name = (sqlTypeName == SqlTypeName.TIMESTAMP) ?
+      (name + "Ms") :
+      ((sqlTypeName == SqlTypeName.DATE) ? (name + "Sec") : name);
+
+    return name;
+  }
+
+  public static String getValidFieldName(RelDataTypeField field)
+  {
+    String name = field.getName().replaceAll("\\W", "");
+    name = (name.length() == 0) ? "f0" : name;
+    return name;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c92ca15e/sql/src/main/java/org/apache/apex/malhar/sql/planner/ApexRelNode.java
----------------------------------------------------------------------
diff --git a/sql/src/main/java/org/apache/apex/malhar/sql/planner/ApexRelNode.java b/sql/src/main/java/org/apache/apex/malhar/sql/planner/ApexRelNode.java
new file mode 100644
index 0000000..7a0d339
--- /dev/null
+++ b/sql/src/main/java/org/apache/apex/malhar/sql/planner/ApexRelNode.java
@@ -0,0 +1,341 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.sql.planner;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.apex.malhar.sql.codegen.ExpressionCompiler;
+import org.apache.apex.malhar.sql.operators.FilterTransformOperator;
+import org.apache.apex.malhar.sql.operators.InnerJoinOperator;
+import org.apache.apex.malhar.sql.operators.OperatorUtils;
+import org.apache.apex.malhar.sql.schema.ApexSQLTable;
+import org.apache.apex.malhar.sql.schema.TupleSchemaRegistry;
+import org.apache.apex.malhar.sql.table.Endpoint;
+
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.TableModify;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.logical.LogicalFilter;
+import org.apache.calcite.rel.logical.LogicalJoin;
+import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.calcite.rel.logical.LogicalTableModify;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+import org.apache.calcite.rel.stream.Delta;
+import org.apache.calcite.rel.stream.LogicalDelta;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.util.Pair;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.Operator;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
+
+/**
+ * This class defines how to populate DAG of Apex for the relational nodes of SQL Calcite
+ */
+@InterfaceStability.Evolving
+public abstract class ApexRelNode
+{
+  public static Map<Class, ApexRelNode> relNodeMapping = ImmutableMap.<Class, ApexRelNode>builder()
+      .put(LogicalDelta.class, new ApexDeltaRel())
+      .put(LogicalTableScan.class, new ApexTableScanRel())
+      .put(LogicalTableModify.class, new ApexTableModifyRel())
+      .put(LogicalProject.class, new ApexProjectRel())
+      .put(LogicalFilter.class, new ApexFilterRel())
+      .put(LogicalJoin.class, new ApexJoinRel())
+      .build();
+
+  public abstract RelInfo visit(RelContext context, RelNode node, List<RelInfo> inputStreams);
+
+  public static class RelContext
+  {
+    public DAG dag;
+    public JavaTypeFactory typeFactory;
+    public TupleSchemaRegistry schemaRegistry;
+
+    public RelContext(DAG dag, JavaTypeFactory typeFactory, TupleSchemaRegistry registry)
+    {
+      this.dag = dag;
+      this.typeFactory = typeFactory;
+      this.schemaRegistry = registry;
+    }
+  }
+
+  /**
+   * This is visitor for {@link Delta} to emit the data to {@link ConsoleOutputOperator}.
+   */
+  private static class ApexDeltaRel extends ApexRelNode
+  {
+    @Override
+    public RelInfo visit(RelContext context, RelNode node, List<RelInfo> inputStreams)
+    {
+      Delta delta = (Delta)node;
+
+      ConsoleOutputOperator console = context.dag
+          .addOperator(OperatorUtils.getUniqueOperatorName(delta.getRelTypeName()), ConsoleOutputOperator.class);
+      console.setStringFormat("Delta Record: %s");
+
+      return new RelInfo("Delta", Lists.<Operator.InputPort>newArrayList(console.input), console, null,
+          delta.getRowType());
+    }
+  }
+
+  /**
+   * This is visitor for {@link TableScan} for adding operators to DAG.
+   */
+  private static class ApexTableScanRel extends ApexRelNode
+  {
+    @Override
+    public RelInfo visit(RelContext context, RelNode node, List<RelInfo> inputStreams)
+    {
+      TableScan scan = (TableScan)node;
+      ApexSQLTable table = scan.getTable().unwrap(ApexSQLTable.class);
+      Endpoint endpoint = table.getEndpoint();
+      return endpoint.populateInputDAG(context.dag, context.typeFactory);
+    }
+  }
+
+  /**
+   * This is visitor for {@link TableModify} for adding operators to DAG.
+   */
+  private static class ApexTableModifyRel extends ApexRelNode
+  {
+    @Override
+    public RelInfo visit(RelContext context, RelNode node, List<RelInfo> inputStreams)
+    {
+      /**
+       * Only INSERT is allowed as it representation destination for DAG processing. Other types like UPDATE, DELETE,
+       * MERGE does not represent the same.
+       */
+
+      TableModify modify = (TableModify)node;
+      Preconditions.checkArgument(modify.isInsert(), "Only INSERT allowed for table modify");
+
+      ApexSQLTable table = modify.getTable().unwrap(ApexSQLTable.class);
+
+      Endpoint endpoint = table.getEndpoint();
+      return endpoint.populateOutputDAG(context.dag, context.typeFactory);
+    }
+  }
+
+  /**
+   * This is visitor for {@link Project} for adding operators to DAG.
+   */
+  private static class ApexProjectRel extends ApexRelNode
+  {
+    @Override
+    public RelInfo visit(RelContext context, RelNode node, List<RelInfo> inputStreams)
+    {
+      Project project = (Project)node;
+      if (inputStreams.size() == 0 || inputStreams.size() > 1) {
+        throw new UnsupportedOperationException("Project is a SingleRel");
+      }
+
+      FilterTransformOperator operator = context.dag
+          .addOperator(OperatorUtils.getUniqueOperatorName(project.getRelTypeName()), FilterTransformOperator.class);
+      Map<String, String> expMap = new HashMap<>();
+      ExpressionCompiler compiler = new ExpressionCompiler(new RexBuilder(project.getCluster().getTypeFactory()));
+
+      for (Pair<RelDataTypeField, RexNode> pair : Pair.zip(project.getRowType().getFieldList(),
+          project.getProjects())) {
+        String fieldName = OperatorUtils.getFieldName(pair.left);
+        String expression = compiler.getExpression(pair.right, project.getInput().getRowType(), project.getRowType());
+        expMap.put(fieldName, expression);
+      }
+      operator.setExpressionMap(expMap);
+
+      return new RelInfo("Project", Lists.<Operator.InputPort>newArrayList(operator.input), operator, operator.output,
+          project.getRowType());
+    }
+  }
+
+  /**
+   * This is visitor for {@link Filter} for adding operators to DAG.
+   */
+  private static class ApexFilterRel extends ApexRelNode
+  {
+    @Override
+    public RelInfo visit(RelContext context, RelNode node, List<RelInfo> inputStreams)
+    {
+      Filter filter = (Filter)node;
+      if (inputStreams.size() == 0 || inputStreams.size() > 1) {
+        throw new UnsupportedOperationException("Filter is a SingleRel");
+      }
+
+      FilterTransformOperator operator = context.dag
+          .addOperator(OperatorUtils.getUniqueOperatorName(filter.getRelTypeName()), FilterTransformOperator.class);
+      ExpressionCompiler compiler = new ExpressionCompiler(new RexBuilder(filter.getCluster().getTypeFactory()));
+      String expression = compiler.getExpression(filter.getCondition(), filter.getInput().getRowType(),
+          filter.getRowType());
+
+      Map<String, String> expMap = new HashMap<>();
+      for (Pair<RelDataTypeField, RelDataTypeField> pair : Pair.zip(filter.getInput().getRowType().getFieldList(),
+          filter.getRowType().getFieldList())) {
+        String leftName = OperatorUtils.getFieldName(pair.left);
+        String rightName = OperatorUtils.getFieldName(pair.right);
+        expMap.put(leftName, rightName);
+      }
+      operator.setExpressionMap(expMap);
+      operator.setCondition(expression);
+
+      return new RelInfo("Filter", Lists.<Operator.InputPort>newArrayList(operator.input), operator, operator.output,
+          filter.getRowType());
+    }
+  }
+
+  /**
+   * This is visitor for {@link Join} for adding operators to DAG.
+   */
+  private static class ApexJoinRel extends ApexRelNode
+  {
+
+    @Override
+    public RelInfo visit(RelContext context, RelNode node, List<RelInfo> inputStreams)
+    {
+      Join join = (Join)node;
+      if (inputStreams.size() != 2) {
+        throw new UnsupportedOperationException("Join is a BiRel");
+      }
+
+      if ((join.getJoinType() == JoinRelType.FULL) || (join.getJoinType() == JoinRelType.LEFT) ||
+          (join.getJoinType() == JoinRelType.RIGHT)) {
+        throw new UnsupportedOperationException("Outer joins are not supported");
+      }
+
+      final List<Integer> leftKeys = new ArrayList<>();
+      final List<Integer> rightKeys = new ArrayList<>();
+
+      RexNode remaining =
+          RelOptUtil.splitJoinCondition(join.getLeft(), join.getRight(), join.getCondition(), leftKeys, rightKeys);
+
+      if (leftKeys.size() != rightKeys.size()) {
+        throw new RuntimeException("Unexpected condition reached. Left and right condition count should be same");
+      }
+
+      if (leftKeys.size() == 0) {
+        throw new UnsupportedOperationException("Theta joins are not supported.");
+      }
+
+      RelInfo relInfo = addInnerJoinOperator(join, leftKeys, rightKeys, context);
+
+      if (!remaining.isAlwaysTrue()) {
+        relInfo = addJoinFilter(join, remaining, relInfo, context);
+      }
+
+      return relInfo;
+    }
+
+    private RelInfo addJoinFilter(Join join, RexNode remaining, RelInfo relInfo, RelContext context)
+    {
+      FilterTransformOperator operator = context.dag
+          .addOperator(OperatorUtils.getUniqueOperatorName(join.getRelTypeName() + "_Filter"),
+          FilterTransformOperator.class);
+      ExpressionCompiler compiler = new ExpressionCompiler(new RexBuilder(join.getCluster().getTypeFactory()));
+      String expression = compiler.getExpression(remaining, join.getRowType(), join.getRowType());
+
+      Map<String, String> expMap = new HashMap<>();
+      for (Pair<RelDataTypeField, RelDataTypeField> pair : Pair.zip(join.getRowType().getFieldList(),
+          join.getRowType().getFieldList())) {
+        String leftName = OperatorUtils.getFieldName(pair.left);
+        String rightName = OperatorUtils.getFieldName(pair.right);
+        expMap.put(leftName, rightName);
+      }
+      operator.setExpressionMap(expMap);
+      operator.setCondition(expression);
+
+      String streamName = OperatorUtils.getUniqueStreamName(join.getRelTypeName() + "_Join", join.getRelTypeName() +
+          "_Filter");
+      Class schema = TupleSchemaRegistry.getSchemaForRelDataType(context.schemaRegistry, streamName,
+          relInfo.getOutRelDataType());
+      context.dag.setOutputPortAttribute(relInfo.getOutPort(), Context.PortContext.TUPLE_CLASS, schema);
+      context.dag.setInputPortAttribute(operator.input, Context.PortContext.TUPLE_CLASS, schema);
+      context.dag.addStream(streamName, relInfo.getOutPort(), operator.input);
+
+      return new RelInfo("Join", relInfo.getInputPorts(), operator, operator.output, join.getRowType());
+    }
+
+    private RelInfo addInnerJoinOperator(Join join, List<Integer> leftKeys, List<Integer> rightKeys, RelContext context)
+    {
+      String leftKeyExpression = null;
+      String rightKeyExpression = null;
+      for (Integer leftKey : leftKeys) {
+        String name = OperatorUtils.getValidFieldName(join.getLeft().getRowType().getFieldList().get(leftKey));
+        leftKeyExpression = (leftKeyExpression == null) ? name : leftKeyExpression + " + " + name;
+      }
+
+      for (Integer rightKey : rightKeys) {
+        String name = OperatorUtils.getValidFieldName(join.getRight().getRowType().getFieldList().get(rightKey));
+        rightKeyExpression = (rightKeyExpression == null) ? name : rightKeyExpression + " + " + name;
+      }
+
+      String includeFieldStr = "";
+      boolean first = true;
+      for (RelDataTypeField field : join.getLeft().getRowType().getFieldList()) {
+        if (first) {
+          first = false;
+        } else {
+          includeFieldStr += ",";
+        }
+        includeFieldStr += OperatorUtils.getValidFieldName(field);
+      }
+      includeFieldStr += ";";
+      first = true;
+      for (RelDataTypeField field : join.getRight().getRowType().getFieldList()) {
+        if (first) {
+          first = false;
+        } else {
+          includeFieldStr += ",";
+        }
+        includeFieldStr += OperatorUtils.getValidFieldName(field);
+      }
+
+      InnerJoinOperator innerJoin = context.dag.addOperator(OperatorUtils.getUniqueOperatorName(join.getRelTypeName()),
+          InnerJoinOperator.class);
+      innerJoin.setExpiryTime(1L);
+      // Number of buckets is set to 47000 because this is rounded number closer to sqrt of MAXINT. This guarantees
+      // even distribution of keys across buckets.
+      innerJoin.setNoOfBuckets(47000);
+      innerJoin.setTimeFieldsStr("");
+
+      innerJoin.setLeftKeyExpression(leftKeyExpression);
+      innerJoin.setRightKeyExpression(rightKeyExpression);
+
+      innerJoin.setIncludeFieldStr(includeFieldStr);
+
+      return new RelInfo("Join", Lists.<Operator.InputPort>newArrayList(innerJoin.input1, innerJoin.input2), innerJoin,
+          innerJoin.outputPort, join.getRowType());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c92ca15e/sql/src/main/java/org/apache/apex/malhar/sql/planner/RelInfo.java
----------------------------------------------------------------------
diff --git a/sql/src/main/java/org/apache/apex/malhar/sql/planner/RelInfo.java b/sql/src/main/java/org/apache/apex/malhar/sql/planner/RelInfo.java
new file mode 100644
index 0000000..29948ec
--- /dev/null
+++ b/sql/src/main/java/org/apache/apex/malhar/sql/planner/RelInfo.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.sql.planner;
+
+import java.util.List;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.datatorrent.api.Operator;
+
+/**
+ * This object communicates stream and connection data between various stages of relational algebra.
+ */
+@InterfaceStability.Evolving
+public class RelInfo
+{
+  private List<Operator.InputPort> inputPorts;
+  private Operator operator;
+  private Operator.OutputPort outPort;
+  private RelDataType outRelDataType;
+  private Class clazz;
+  private String relName;
+
+  public RelInfo(String relName, List<Operator.InputPort> inputPorts, Operator operator, Operator.OutputPort outPort,
+      RelDataType outRelDataType)
+  {
+    this.inputPorts = inputPorts;
+    this.relName = relName;
+    this.operator = operator;
+    this.outPort = outPort;
+    this.outRelDataType = outRelDataType;
+    this.clazz = null;
+  }
+
+  public RelInfo(String relName, List<Operator.InputPort> inputPorts, Operator operator, Operator.OutputPort outPort, Class clazz)
+  {
+    this.inputPorts = inputPorts;
+    this.operator = operator;
+    this.outPort = outPort;
+    this.clazz = clazz;
+    this.relName = relName;
+    this.outRelDataType = null;
+  }
+
+  public Class getClazz()
+  {
+    return clazz;
+  }
+
+  public void setClazz(Class clazz)
+  {
+    this.clazz = clazz;
+  }
+
+  public List<Operator.InputPort> getInputPorts()
+  {
+    return inputPorts;
+  }
+
+  public void setInputPorts(List<Operator.InputPort> inputPorts)
+  {
+    this.inputPorts = inputPorts;
+  }
+
+  public String getRelName()
+  {
+    return relName;
+  }
+
+  public void setRelName(String relName)
+  {
+    this.relName = relName;
+  }
+
+  public Operator getOperator()
+  {
+    return operator;
+  }
+
+  public void setOperator(Operator operator)
+  {
+    this.operator = operator;
+  }
+
+  public Operator.OutputPort getOutPort()
+  {
+    return outPort;
+  }
+
+  public void setOutPort(Operator.OutputPort outPort)
+  {
+    this.outPort = outPort;
+  }
+
+  public RelDataType getOutRelDataType()
+  {
+    return outRelDataType;
+  }
+
+  public void setOutRelDataType(RelDataType outRelDataType)
+  {
+    this.outRelDataType = outRelDataType;
+  }
+}