You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by sr...@apache.org on 2018/11/09 16:31:07 UTC

[2/2] samza git commit: SAMZA-1616: Samza-Sql - Support remote table for stream-table join

SAMZA-1616: Samza-Sql - Support remote table for stream-table join

Author: Aditya Toomula <at...@linkedin.com>

Reviewers: Srinivasulu Punuru <sp...@linkedin.com>

Closes #794 from atoomula/remote


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/12f421ce
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/12f421ce
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/12f421ce

Branch: refs/heads/master
Commit: 12f421ce212949ff3152dc9e0f1e4f38233dc5b8
Parents: 3da75e6
Author: Aditya Toomula <at...@linkedin.com>
Authored: Fri Nov 9 08:30:59 2018 -0800
Committer: Srinivasulu Punuru <sp...@linkedin.com>
Committed: Fri Nov 9 08:30:59 2018 -0800

----------------------------------------------------------------------
 .../apache/samza/sql/avro/AvroRelConverter.java |  30 +--
 .../interfaces/SamzaRelTableKeyConverter.java   |  39 ++++
 .../SamzaRelTableKeyConverterFactory.java       |  39 ++++
 .../samza/sql/interfaces/SqlIOConfig.java       |  19 ++
 .../samza/sql/runner/SamzaSqlApplication.java   |  15 +-
 .../sql/runner/SamzaSqlApplicationConfig.java   |  19 +-
 .../samza/sql/translator/JoinInputNode.java     |  76 ++++++
 .../samza/sql/translator/JoinTranslator.java    | 161 ++++++++-----
 .../samza/sql/translator/QueryTranslator.java   |   9 +-
 .../SamzaSqlLocalTableJoinFunction.java         |  54 +++++
 .../SamzaSqlRelMessageJoinFunction.java         | 126 ----------
 .../SamzaSqlRemoteTableJoinFunction.java        |  79 +++++++
 .../translator/SamzaSqlTableJoinFunction.java   | 121 ++++++++++
 .../samza/sql/translator/ScanTranslator.java    |  12 +
 .../samza/sql/translator/TranslatorContext.java |  13 +-
 .../samza/sql/e2e/TestSamzaSqlRemoteTable.java  | 115 +++++++++
 .../apache/samza/sql/e2e/TestSamzaSqlTable.java |  68 ------
 .../samza/sql/system/TestAvroSystemFactory.java |  27 ++-
 .../RemoteStoreIOResolverTestFactory.java       | 147 ++++++++++++
 .../testutil/SampleRelTableKeyConverter.java    |  36 +++
 .../SampleRelTableKeyConverterFactory.java      |  41 ++++
 .../samza/sql/testutil/SamzaSqlTestConfig.java  |  31 ++-
 .../sql/testutil/TestIOResolverFactory.java     | 234 -------------------
 .../sql/translator/TestJoinTranslator.java      |  60 +++--
 .../sql/translator/TestQueryTranslator.java     |  40 ++--
 .../TestSamzaSqlLocalTableJoinFunction.java     | 161 +++++++++++++
 .../TestSamzaSqlRelMessageJoinFunction.java     | 121 ----------
 .../TestSamzaSqlRemoteTableJoinFunction.java    | 103 ++++++++
 .../test/samzasql/TestSamzaSqlEndToEnd.java     |  61 +++++
 29 files changed, 1356 insertions(+), 701 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/12f421ce/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
index 89026ee..e1e1660 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
@@ -145,7 +145,7 @@ public class AvroRelConverter implements SamzaRelConverter {
     return new KV<>(relMessage.getKey(), convertToGenericRecord(relMessage.getSamzaSqlRelRecord(), payloadSchema));
   }
 
-  private GenericRecord convertToGenericRecord(SamzaSqlRelRecord relRecord, Schema schema) {
+  static public GenericRecord convertToGenericRecord(SamzaSqlRelRecord relRecord, Schema schema) {
     GenericRecord record = new GenericData.Record(schema);
     List<String> fieldNames = relRecord.getFieldNames();
     List<Object> values = relRecord.getFieldValues();
@@ -160,7 +160,7 @@ public class AvroRelConverter implements SamzaRelConverter {
     return record;
   }
 
-  public Object convertToAvroObject(Object relObj, Schema schema) {
+  static public Object convertToAvroObject(Object relObj, Schema schema) {
     if (relObj == null) {
       return null;
     }
@@ -190,6 +190,19 @@ public class AvroRelConverter implements SamzaRelConverter {
     }
   }
 
+  // Two non-nullable types in a union is not yet supported.
+  static public Schema getNonNullUnionSchema(Schema schema) {
+    if (schema.getType().equals(Schema.Type.UNION)) {
+      if (schema.getTypes().get(0).getType() != Schema.Type.NULL) {
+        return schema.getTypes().get(0);
+      }
+      if (schema.getTypes().get(1).getType() != Schema.Type.NULL) {
+        return schema.getTypes().get(1);
+      }
+    }
+    return schema;
+  }
+
   // Not doing any validations of data types with Avro schema considering the resource cost per message.
   // Casting would fail if the data types are not in sync with the schema.
   public Object convertToJavaObject(Object avroObj, Schema schema) {
@@ -238,17 +251,4 @@ public class AvroRelConverter implements SamzaRelConverter {
         return avroObj;
     }
   }
-
-  // Two non-nullable types in a union is not yet supported.
-  public Schema getNonNullUnionSchema(Schema schema) {
-    if (schema.getType().equals(Schema.Type.UNION)) {
-      if (schema.getTypes().get(0).getType() != Schema.Type.NULL) {
-        return schema.getTypes().get(0);
-      }
-      if (schema.getTypes().get(1).getType() != Schema.Type.NULL) {
-        return schema.getTypes().get(1);
-      }
-    }
-    return schema;
-  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/12f421ce/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SamzaRelTableKeyConverter.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SamzaRelTableKeyConverter.java b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SamzaRelTableKeyConverter.java
new file mode 100644
index 0000000..8af34f7
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SamzaRelTableKeyConverter.java
@@ -0,0 +1,39 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.interfaces;
+
+import org.apache.samza.sql.SamzaSqlRelRecord;
+
+
+/**
+ * {@link org.apache.samza.sql.translator.SamzaSqlRemoteTableJoinFunction} uses {@link SamzaRelTableKeyConverter}
+ * to convert the key to the format expected by the remote table system before doing the table lookup.
+ *
+ * The {@link SamzaRelTableKeyConverter} is configurable at a system level, so it is possible to configure different
+ * {@link SamzaRelTableKeyConverter} for different remote table systems.
+ */
+public interface SamzaRelTableKeyConverter {
+  /**
+   * Convert the key in relational record format to the format expected by remote table.
+   * @param relKeyRecord key relational record that needs to be converted.
+   * @return the table key
+   */
+  Object convertToTableKeyFormat(SamzaSqlRelRecord relKeyRecord);
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/12f421ce/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SamzaRelTableKeyConverterFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SamzaRelTableKeyConverterFactory.java b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SamzaRelTableKeyConverterFactory.java
new file mode 100644
index 0000000..aab8410
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SamzaRelTableKeyConverterFactory.java
@@ -0,0 +1,39 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.interfaces;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.system.SystemStream;
+
+
+/**
+ * Factory that is used to create {@link SamzaRelTableKeyConverter}
+ */
+public interface SamzaRelTableKeyConverterFactory {
+
+  /**
+   * Create a {@link SamzaRelTableKeyConverter}. This method is called when the framework wants to create the
+   * {@link SamzaRelTableKeyConverter} corresponding to the system.
+   * @param systemStream the systemStream to create a key converter for
+   * @param config config that is used to create the object
+   * @return the object created.
+   */
+  SamzaRelTableKeyConverter create(SystemStream systemStream, Config config);
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/12f421ce/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlIOConfig.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlIOConfig.java b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlIOConfig.java
index 8636736..3ef1795 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlIOConfig.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlIOConfig.java
@@ -29,6 +29,7 @@ import org.apache.commons.lang.Validate;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.StreamConfig;
+import org.apache.samza.table.descriptors.RemoteTableDescriptor;
 import org.apache.samza.table.descriptors.TableDescriptor;
 import org.apache.samza.system.SystemStream;
 
@@ -39,6 +40,7 @@ import org.apache.samza.system.SystemStream;
 public class SqlIOConfig {
 
   public static final String CFG_SAMZA_REL_CONVERTER = "samzaRelConverterName";
+  public static final String CFG_SAMZA_REL_TABLE_KEY_CONVERTER = "samzaRelTableKeyConverterName";
   public static final String CFG_REL_SCHEMA_PROVIDER = "relSchemaProviderName";
 
   private final String systemName;
@@ -46,6 +48,7 @@ public class SqlIOConfig {
   private final String streamName;
 
   private final String samzaRelConverterName;
+  private final String samzaRelTableKeyConverterName;
   private final SystemStream systemStream;
 
   private final String source;
@@ -79,6 +82,14 @@ public class SqlIOConfig {
     Validate.notEmpty(samzaRelConverterName,
         String.format("%s is not set or empty for system %s", CFG_SAMZA_REL_CONVERTER, systemName));
 
+    if (isRemoteTable()) {
+      samzaRelTableKeyConverterName = streamConfigs.get(CFG_SAMZA_REL_TABLE_KEY_CONVERTER);
+      Validate.notEmpty(samzaRelTableKeyConverterName,
+          String.format("%s is not set or empty for system %s", CFG_SAMZA_REL_CONVERTER, systemName));
+    } else {
+      samzaRelTableKeyConverterName = "";
+    }
+
     relSchemaProviderName = streamConfigs.get(CFG_REL_SCHEMA_PROVIDER);
 
     // Removing the Samza SQL specific configs to get the remaining Samza configs.
@@ -114,6 +125,10 @@ public class SqlIOConfig {
     return samzaRelConverterName;
   }
 
+  public String getSamzaRelTableKeyConverterName() {
+    return samzaRelTableKeyConverterName;
+  }
+
   public String getRelSchemaProviderName() {
     return relSchemaProviderName;
   }
@@ -133,4 +148,8 @@ public class SqlIOConfig {
   public Optional<TableDescriptor> getTableDescriptor() {
     return tableDescriptor;
   }
+
+  public boolean isRemoteTable() {
+    return tableDescriptor.isPresent() && tableDescriptor.get() instanceof RemoteTableDescriptor;
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/12f421ce/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplication.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplication.java b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplication.java
index 47d6cf0..b8bb190 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplication.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplication.java
@@ -25,13 +25,11 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.calcite.rel.RelRoot;
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
 import org.apache.samza.sql.data.SamzaSqlExecutionContext;
 import org.apache.samza.sql.dsl.SamzaSqlDslConverter;
-import org.apache.samza.sql.interfaces.SamzaRelConverter;
 import org.apache.samza.sql.translator.QueryTranslator;
 import org.apache.samza.sql.translator.TranslatorContext;
 import org.slf4j.Logger;
@@ -44,7 +42,6 @@ import org.slf4j.LoggerFactory;
 public class SamzaSqlApplication implements StreamApplication {
 
   private static final Logger LOG = LoggerFactory.getLogger(SamzaSqlApplication.class);
-  private AtomicInteger queryId = new AtomicInteger(0);
 
   @Override
   public void describe(StreamApplicationDescriptor appDescriptor) {
@@ -69,13 +66,13 @@ public class SamzaSqlApplication implements StreamApplication {
       // 3. Translate Calcite plan to Samza stream operators
       QueryTranslator queryTranslator = new QueryTranslator(appDescriptor, sqlConfig);
       SamzaSqlExecutionContext executionContext = new SamzaSqlExecutionContext(sqlConfig);
-      Map<String, SamzaRelConverter> converters = sqlConfig.getSamzaRelConverters();
+      int queryId = 0;
       for (RelRoot relRoot : relRoots) {
-        LOG.info("Translating relRoot {} to samza stream graph", relRoot);
-        int qId = queryId.incrementAndGet();
-        TranslatorContext translatorContext = new TranslatorContext(appDescriptor, relRoot, executionContext, converters);
-        translatorContextMap.put(qId, translatorContext);
-        queryTranslator.translate(relRoot, translatorContext, qId);
+        LOG.info("Translating relRoot {} to samza stream graph with queryId {}", relRoot, queryId);
+        TranslatorContext translatorContext = new TranslatorContext(appDescriptor, relRoot, executionContext);
+        translatorContextMap.put(queryId, translatorContext);
+        queryTranslator.translate(relRoot, translatorContext, queryId);
+        queryId++;
       }
 
       // 4. Set all translator contexts

http://git-wip-us.apache.org/repos/asf/samza/blob/12f421ce/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
index dcb5043..6e12c02 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
@@ -47,6 +47,8 @@ import org.apache.samza.sql.interfaces.RelSchemaProvider;
 import org.apache.samza.sql.interfaces.RelSchemaProviderFactory;
 import org.apache.samza.sql.interfaces.SamzaRelConverter;
 import org.apache.samza.sql.interfaces.SamzaRelConverterFactory;
+import org.apache.samza.sql.interfaces.SamzaRelTableKeyConverter;
+import org.apache.samza.sql.interfaces.SamzaRelTableKeyConverterFactory;
 import org.apache.samza.sql.interfaces.SqlIOResolver;
 import org.apache.samza.sql.interfaces.SqlIOResolverFactory;
 import org.apache.samza.sql.interfaces.SqlIOConfig;
@@ -76,6 +78,7 @@ public class SamzaSqlApplicationConfig {
 
   public static final String CFG_FMT_REL_SCHEMA_PROVIDER_DOMAIN = "samza.sql.relSchemaProvider.%s.";
   public static final String CFG_FMT_SAMZA_REL_CONVERTER_DOMAIN = "samza.sql.relConverter.%s.";
+  public static final String CFG_FMT_SAMZA_REL_TABLE_KEY_CONVERTER_DOMAIN = "samza.sql.relTableKeyConverter.%s.";
 
   public static final String CFG_IO_RESOLVER = "samza.sql.ioResolver";
   public static final String CFG_FMT_SOURCE_RESOLVER_DOMAIN = "samza.sql.ioResolver.%s.";
@@ -94,6 +97,7 @@ public class SamzaSqlApplicationConfig {
 
   private final Map<String, RelSchemaProvider> relSchemaProvidersBySource;
   private final Map<String, SamzaRelConverter> samzaRelConvertersBySource;
+  private final Map<String, SamzaRelTableKeyConverter> samzaRelTableKeyConvertersBySource;
 
   private SqlIOResolver ioResolver;
   private UdfResolver udfResolver;
@@ -135,6 +139,13 @@ public class SamzaSqlApplicationConfig {
                 CFG_FMT_SAMZA_REL_CONVERTER_DOMAIN, (o, c) -> ((SamzaRelConverterFactory) o).create(x.getSystemStream(),
                     relSchemaProvidersBySource.get(x.getSource()), c))));
 
+    samzaRelTableKeyConvertersBySource = systemStreamConfigs.stream()
+        .filter(config -> config.isRemoteTable())
+        .collect(Collectors.toMap(SqlIOConfig::getSource,
+            x -> initializePlugin("SamzaRelTableKeyConverter", x.getSamzaRelTableKeyConverterName(),
+                staticConfig, CFG_FMT_SAMZA_REL_TABLE_KEY_CONVERTER_DOMAIN,
+                (o, c) -> ((SamzaRelTableKeyConverterFactory) o).create(x.getSystemStream(), c))));
+
     udfResolver = createUdfResolver(staticConfig);
     udfMetadata = udfResolver.getUdfs();
 
@@ -287,12 +298,12 @@ public class SamzaSqlApplicationConfig {
     return samzaRelConvertersBySource;
   }
 
-  public Map<String, RelSchemaProvider> getRelSchemaProviders() {
-    return relSchemaProvidersBySource;
+  public Map<String, SamzaRelTableKeyConverter> getSamzaRelTableKeyConverters() {
+    return samzaRelTableKeyConvertersBySource;
   }
 
-  public SqlIOResolver getIoResolver() {
-    return ioResolver;
+  public Map<String, RelSchemaProvider> getRelSchemaProviders() {
+    return relSchemaProvidersBySource;
   }
 
   public String getMetadataTopicPrefix() {

http://git-wip-us.apache.org/repos/asf/samza/blob/12f421ce/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinInputNode.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinInputNode.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinInputNode.java
new file mode 100644
index 0000000..d952194
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinInputNode.java
@@ -0,0 +1,76 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.translator;
+
+import java.util.List;
+import org.apache.calcite.rel.RelNode;
+import org.apache.samza.sql.interfaces.SqlIOConfig;
+
+
+/**
+ * This class represents the input node for the join. It can be either a table or a stream.
+ */
+class JoinInputNode {
+
+  // Calcite RelNode corresponding to the input
+  private final RelNode relNode;
+  // Id of the key(s) in the fields that this input has
+  private final List<Integer> keyIds;
+
+  private final InputType inputType;
+  private final boolean isPosOnRight;
+
+  enum InputType {
+    STREAM,
+    LOCAL_TABLE,
+    REMOTE_TABLE
+  }
+
+  JoinInputNode(RelNode relNode, List<Integer> keyIds, InputType inputType, boolean isPosOnRight) {
+    this.relNode = relNode;
+    this.keyIds = keyIds;
+    this.inputType = inputType;
+    this.isPosOnRight = isPosOnRight;
+  }
+
+  boolean isRemoteTable() {
+    return this.inputType == InputType.REMOTE_TABLE;
+  }
+
+  List<Integer> getKeyIds() {
+    return keyIds;
+  }
+
+  List<String> getFieldNames() {
+    return relNode.getRowType().getFieldNames();
+  }
+
+  RelNode getRelNode() {
+    return relNode;
+  }
+
+  String getSourceName() {
+    return SqlIOConfig.getSourceFromSourceParts(relNode.getTable().getQualifiedName());
+  }
+
+  boolean isPosOnRight() {
+    return isPosOnRight;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/12f421ce/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
index 5f44ff9..113244f 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
@@ -40,14 +40,15 @@ import org.apache.commons.lang.Validate;
 import org.apache.samza.SamzaException;
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.functions.StreamTableJoinFunction;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.sql.SamzaSqlRelRecord;
 import org.apache.samza.sql.data.SamzaSqlRelMessage;
-import org.apache.samza.sql.interfaces.SqlIOResolver;
 import org.apache.samza.sql.interfaces.SqlIOConfig;
 import org.apache.samza.sql.serializers.SamzaSqlRelMessageSerdeFactory;
 import org.apache.samza.sql.serializers.SamzaSqlRelRecordSerdeFactory;
 import org.apache.samza.table.Table;
+import org.apache.samza.table.descriptors.RemoteTableDescriptor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -59,38 +60,40 @@ import static org.apache.samza.sql.data.SamzaSqlRelMessage.createSamzaSqlComposi
  * Translator to translate the LogicalJoin node in the relational graph to the corresponding StreamGraph
  * implementation.
  * Join is supported with the following caveats:
- *   0. Only local tables are supported. Remote/composite tables are not yet supported.
  *   1. Only stream-table joins are supported. No stream-stream joins.
  *   2. Only Equi-joins are supported. No theta-joins.
  *   3. Inner joins, Left and Right outer joins are supported. No cross joins, full outer joins or natural joins.
  *   4. Join condition with a constant is not supported.
  *   5. Compound join condition with only AND operator is supported. AND operator with a constant is not supported. No
  *      support for OR operator or any other operator in the join condition.
- *
- * It is assumed that the stream denoted as 'table' is already partitioned by the key(s) specified in the join
- * condition. We do not repartition the table as bootstrap semantic is not propagated to the intermediate streams.
- * Please refer SAMZA-1613 for more details on this. But we always repartition the stream by the key(s) specified in
- * the join condition.
+ * For local table, we always repartition both the stream to be joined and the stream denoted as table by the key(s)
+ * specified in the join condition.
  */
 class JoinTranslator {
 
   private static final Logger log = LoggerFactory.getLogger(JoinTranslator.class);
   private int joinId;
-  private SqlIOResolver ioResolver;
   private final String intermediateStreamPrefix;
+  private final int queryId;
 
-  JoinTranslator(int joinId, SqlIOResolver ioResolver, String intermediateStreamPrefix) {
+  JoinTranslator(int joinId, String intermediateStreamPrefix, int queryId) {
     this.joinId = joinId;
-    this.ioResolver = ioResolver;
     this.intermediateStreamPrefix = intermediateStreamPrefix + (intermediateStreamPrefix.isEmpty() ? "" : "_");
+    this.queryId = queryId;
   }
 
   void translate(final LogicalJoin join, final TranslatorContext context) {
+    JoinInputNode.InputType inputTypeOnLeft = getInputType(join.getLeft(), context);
+    JoinInputNode.InputType inputTypeOnRight = getInputType(join.getRight(), context);
 
     // Do the validation of join query
-    validateJoinQuery(join);
+    validateJoinQuery(join, inputTypeOnLeft, inputTypeOnRight);
 
-    boolean isTablePosOnRight = isTable(join.getRight());
+    // At this point, one of the sides is a table. Let's figure out if it is on left or right side.
+    boolean isTablePosOnRight = (inputTypeOnRight != JoinInputNode.InputType.STREAM);
+
+    // stream and table keyIds are used to extract the join condition field (key) names and values out of the stream
+    // and table records.
     List<Integer> streamKeyIds = new LinkedList<>();
     List<Integer> tableKeyIds = new LinkedList<>();
 
@@ -98,25 +101,46 @@ class JoinTranslator {
     populateStreamAndTableKeyIds(((RexCall) join.getCondition()).getOperands(), join, isTablePosOnRight, streamKeyIds,
         tableKeyIds);
 
-    Table table = loadLocalTable(isTablePosOnRight, tableKeyIds, join, context);
+    // Get the two input nodes (stream and table nodes) for the join.
+    JoinInputNode streamNode = new JoinInputNode(isTablePosOnRight ? join.getLeft() : join.getRight(), streamKeyIds,
+        isTablePosOnRight ? inputTypeOnLeft : inputTypeOnRight, !isTablePosOnRight);
+    JoinInputNode tableNode = new JoinInputNode(isTablePosOnRight ? join.getRight() : join.getLeft(), tableKeyIds,
+        isTablePosOnRight ? inputTypeOnRight : inputTypeOnLeft, isTablePosOnRight);
+
+    MessageStream<SamzaSqlRelMessage> inputStream = context.getMessageStream(streamNode.getRelNode().getId());
+    Table table = getTable(tableNode, context);
+
+    MessageStream<SamzaSqlRelMessage> outputStream =
+        joinStreamWithTable(inputStream, table, streamNode, tableNode, join, context);
+
+    context.registerMessageStream(join.getId(), outputStream);
+  }
 
-    MessageStream<SamzaSqlRelMessage> inputStream =
-        isTablePosOnRight ?
-            context.getMessageStream(join.getLeft().getId()) : context.getMessageStream(join.getRight().getId());
+  private MessageStream<SamzaSqlRelMessage> joinStreamWithTable(MessageStream<SamzaSqlRelMessage> inputStream,
+      Table table, JoinInputNode streamNode, JoinInputNode tableNode, LogicalJoin join, TranslatorContext context) {
 
-    List<String> streamFieldNames =
-        new ArrayList<>((isTablePosOnRight ? join.getLeft() : join.getRight()).getRowType().getFieldNames());
-    List<String> tableFieldNames =
-        new ArrayList<>((isTablePosOnRight ? join.getRight() : join.getLeft()).getRowType().getFieldNames());
+    List<Integer> streamKeyIds = streamNode.getKeyIds();
+    List<Integer> tableKeyIds = tableNode.getKeyIds();
     Validate.isTrue(streamKeyIds.size() == tableKeyIds.size());
+
     log.info("Joining on the following Stream and Table field(s): ");
+    List<String> streamFieldNames = new ArrayList<>(streamNode.getFieldNames());
+    List<String> tableFieldNames = new ArrayList<>(tableNode.getFieldNames());
     for (int i = 0; i < streamKeyIds.size(); i++) {
       log.info(streamFieldNames.get(streamKeyIds.get(i)) + " with " + tableFieldNames.get(tableKeyIds.get(i)));
     }
 
-    SamzaSqlRelMessageJoinFunction joinFn =
-        new SamzaSqlRelMessageJoinFunction(join.getJoinType(), isTablePosOnRight, streamKeyIds, streamFieldNames,
-            tableKeyIds, tableFieldNames);
+    if (tableNode.isRemoteTable()) {
+      String remoteTableName = tableNode.getSourceName();
+      StreamTableJoinFunction joinFn = new SamzaSqlRemoteTableJoinFunction(context.getMsgConverter(remoteTableName),
+          context.getTableKeyConverter(remoteTableName), streamNode, tableNode, join.getJoinType(), queryId);
+
+      return inputStream.join(table, joinFn);
+    }
+
+    // Join with the local table
+
+    StreamTableJoinFunction joinFn = new SamzaSqlLocalTableJoinFunction(streamNode, tableNode, join.getJoinType());
 
     SamzaSqlRelRecordSerdeFactory.SamzaSqlRelRecordSerde keySerde =
         (SamzaSqlRelRecordSerdeFactory.SamzaSqlRelRecordSerde) new SamzaSqlRelRecordSerdeFactory().getSerde(null, null);
@@ -126,20 +150,17 @@ class JoinTranslator {
     // Always re-partition the messages from the input stream by the composite key and then join the messages
     // with the table. For the composite key, provide the corresponding table names in the key instead of using
     // the names from the stream as the lookup needs to be done based on what is stored in the local table.
-    MessageStream<SamzaSqlRelMessage> outputStream =
+    return
         inputStream
             .partitionBy(m -> createSamzaSqlCompositeKey(m, streamKeyIds,
-                getSamzaSqlCompositeKeyFieldNames(tableFieldNames, tableKeyIds)),
-                m -> m,
-                KVSerde.of(keySerde, valueSerde),
-                intermediateStreamPrefix + "stream_" + joinId)
+            getSamzaSqlCompositeKeyFieldNames(tableFieldNames, tableKeyIds)), m -> m, KVSerde.of(keySerde, valueSerde),
+            intermediateStreamPrefix + "stream_" + joinId)
             .map(KV::getValue)
             .join(table, joinFn);
-
-    context.registerMessageStream(join.getId(), outputStream);
   }
 
-  private void validateJoinQuery(LogicalJoin join) {
+  private void validateJoinQuery(LogicalJoin join, JoinInputNode.InputType inputTypeOnLeft,
+      JoinInputNode.InputType inputTypeOnRight) {
     JoinRelType joinRelType = join.getJoinType();
 
     if (joinRelType.compareTo(JoinRelType.INNER) != 0 && joinRelType.compareTo(JoinRelType.LEFT) != 0
@@ -147,8 +168,8 @@ class JoinTranslator {
       throw new SamzaException("Query with only INNER and LEFT/RIGHT OUTER join are supported.");
     }
 
-    boolean isTablePosOnLeft = isTable(join.getLeft());
-    boolean isTablePosOnRight = isTable(join.getRight());
+    boolean isTablePosOnLeft = (inputTypeOnLeft != JoinInputNode.InputType.STREAM);
+    boolean isTablePosOnRight = (inputTypeOnRight != JoinInputNode.InputType.STREAM);
 
     if (!isTablePosOnLeft && !isTablePosOnRight) {
       throw new SamzaException("Invalid query with both sides of join being denoted as 'stream'. "
@@ -221,12 +242,13 @@ class JoinTranslator {
     // the sql 'from' clause. Let's put the operand with smaller index in leftRef and larger
     // index in rightRef so that the order of operands in the join condition is in the order
     // the stream and table are specified in the 'from' clause.
+
     RexInputRef leftRef = (RexInputRef) operands.get(0);
     RexInputRef rightRef = (RexInputRef) operands.get(1);
 
     // Let's validate the key used in the join condition.
-    validateKey(leftRef);
-    validateKey(rightRef);
+    validateJoinKeys(leftRef);
+    validateJoinKeys(rightRef);
 
     if (leftRef.getIndex() > rightRef.getIndex()) {
       RexInputRef tmpRef = leftRef;
@@ -240,12 +262,14 @@ class JoinTranslator {
     tableKeyIds.add(isTablePosOnRight ? deltaKeyIdx : leftRef.getIndex());
   }
 
-  private void validateKey(RexInputRef ref) {
+  private void validateJoinKeys(RexInputRef ref) {
     SqlTypeName sqlTypeName = ref.getType().getSqlTypeName();
-    // Only primitive types are supported in the key
+
+    // Primitive types and ANY (for the record key) are supported in the key
     if (sqlTypeName != SqlTypeName.BOOLEAN && sqlTypeName != SqlTypeName.TINYINT && sqlTypeName != SqlTypeName.SMALLINT
         && sqlTypeName != SqlTypeName.INTEGER && sqlTypeName != SqlTypeName.CHAR && sqlTypeName != SqlTypeName.BIGINT
-        && sqlTypeName != SqlTypeName.VARCHAR && sqlTypeName != SqlTypeName.DOUBLE && sqlTypeName != SqlTypeName.FLOAT) {
+        && sqlTypeName != SqlTypeName.VARCHAR && sqlTypeName != SqlTypeName.DOUBLE && sqlTypeName != SqlTypeName.FLOAT
+        && sqlTypeName != SqlTypeName.ANY) {
       log.error("Unsupported key type " + sqlTypeName + " used in join condition.");
       throw new SamzaException("Unsupported key type used in join condition.");
     }
@@ -257,9 +281,9 @@ class JoinTranslator {
         SqlExplainLevel.EXPPLAN_ATTRIBUTES);
   }
 
-  private SqlIOConfig resolveSourceConfigForTable(RelNode relNode) {
+  private SqlIOConfig resolveSourceConfigForTable(RelNode relNode, TranslatorContext context) {
     if (relNode instanceof LogicalProject) {
-      return resolveSourceConfigForTable(((LogicalProject) relNode).getInput());
+      return resolveSourceConfigForTable(((LogicalProject) relNode).getInput(), context);
     }
 
     // We are returning the sourceConfig for the table as null when the table is in another join rather than an output
@@ -268,57 +292,76 @@ class JoinTranslator {
       return null;
     }
 
-    String sourceName = String.join(".", relNode.getTable().getQualifiedName());
-    SqlIOConfig sourceConfig = ioResolver.fetchSourceInfo(sourceName);
+    String sourceName = SqlIOConfig.getSourceFromSourceParts(relNode.getTable().getQualifiedName());
+    SqlIOConfig sourceConfig =
+        context.getExecutionContext().getSamzaSqlApplicationConfig().getInputSystemStreamConfigBySource().get(sourceName);
     if (sourceConfig == null) {
       throw new SamzaException("Unsupported source found in join statement: " + sourceName);
     }
     return sourceConfig;
   }
 
-  private boolean isTable(RelNode relNode) {
+  private JoinInputNode.InputType getInputType(RelNode relNode, TranslatorContext context) {
+
     // NOTE: Any intermediate form of a join is always a stream. Eg: For the second level join of
     // stream-table-table join, the left side of the join is join output, which we always
     // assume to be a stream. The intermediate stream won't be an instance of EnumerableTableScan.
     // The join key(s) for the table could be an udf in which case the relNode would be LogicalProject.
+
     if (relNode instanceof EnumerableTableScan || relNode instanceof LogicalProject) {
-      SqlIOConfig sourceTableConfig = resolveSourceConfigForTable(relNode);
-      return sourceTableConfig != null && sourceTableConfig.getTableDescriptor().isPresent();
+      SqlIOConfig sourceTableConfig = resolveSourceConfigForTable(relNode, context);
+      if (sourceTableConfig == null || !sourceTableConfig.getTableDescriptor().isPresent()) {
+        return JoinInputNode.InputType.STREAM;
+      } else if (sourceTableConfig.getTableDescriptor().get() instanceof RemoteTableDescriptor) {
+        return JoinInputNode.InputType.REMOTE_TABLE;
+      } else {
+        return JoinInputNode.InputType.LOCAL_TABLE;
+      }
     } else {
-      return false;
+      return JoinInputNode.InputType.STREAM;
     }
   }
 
-  private Table loadLocalTable(boolean isTablePosOnRight, List<Integer> tableKeyIds, LogicalJoin join, TranslatorContext context) {
-    RelNode relNode = isTablePosOnRight ? join.getRight() : join.getLeft();
-
-    MessageStream<SamzaSqlRelMessage> relOutputStream = context.getMessageStream(relNode.getId());
+  private Table getTable(JoinInputNode tableNode, TranslatorContext context) {
 
-    SqlIOConfig sourceTableConfig = resolveSourceConfigForTable(relNode);
+    SqlIOConfig sourceTableConfig = resolveSourceConfigForTable(tableNode.getRelNode(), context);
 
     if (sourceTableConfig == null || !sourceTableConfig.getTableDescriptor().isPresent()) {
-      String errMsg = "Failed to resolve table source in join operation: node=" + relNode;
+      String errMsg = "Failed to resolve table source in join operation: node=" + tableNode.getRelNode();
       log.error(errMsg);
       throw new SamzaException(errMsg);
     }
 
-    // Create a table backed by RocksDb store with the fields in the join condition as composite key and relational
-    // message as the value. Send the messages from the input stream denoted as 'table' to the created table store.
     Table<KV<SamzaSqlRelRecord, SamzaSqlRelMessage>> table =
         context.getStreamAppDescriptor().getTable(sourceTableConfig.getTableDescriptor().get());
 
+    if (tableNode.isRemoteTable()) {
+      return table;
+    }
+
+    // If local table, load the table.
+
+    // Load the local table with the fields in the join condition as composite key and relational message as the value.
+    // Send the messages from the input stream denoted as 'table' to the created table store.
+
+    MessageStream<SamzaSqlRelMessage> relOutputStream = context.getMessageStream(tableNode.getRelNode().getId());
+
     SamzaSqlRelRecordSerdeFactory.SamzaSqlRelRecordSerde keySerde =
         (SamzaSqlRelRecordSerdeFactory.SamzaSqlRelRecordSerde) new SamzaSqlRelRecordSerdeFactory().getSerde(null, null);
     SamzaSqlRelMessageSerdeFactory.SamzaSqlRelMessageSerde valueSerde =
         (SamzaSqlRelMessageSerdeFactory.SamzaSqlRelMessageSerde) new SamzaSqlRelMessageSerdeFactory().getSerde(null, null);
+
+    List<Integer> tableKeyIds = tableNode.getKeyIds();
+
     // Let's always repartition by the join fields as key before sending the key and value to the table.
     // We need to repartition the stream denoted as table to ensure that both the stream and table that are joined
-    // have the same partitioning scheme and partition key.
+    // have the same partitioning scheme with the same partition key and number. Please note that bootstrap semantic is
+    // not propagated to the intermediate streams. Please refer SAMZA-1613 for more details on this. Subsequently, the
+    // results are consistent only after the local table is caught up.
+
     relOutputStream
-        .partitionBy(m -> createSamzaSqlCompositeKey(m, tableKeyIds),
-            m -> m,
-            KVSerde.of(keySerde, valueSerde),
-            intermediateStreamPrefix + "table_" + joinId)
+        .partitionBy(m -> createSamzaSqlCompositeKey(m, tableKeyIds), m -> m,
+            KVSerde.of(keySerde, valueSerde), intermediateStreamPrefix + "table_" + joinId)
         .sendTo(table);
 
     return table;

http://git-wip-us.apache.org/repos/asf/samza/blob/12f421ce/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
index 4c5f11c..7f3c11e 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
@@ -104,15 +104,13 @@ public class QueryTranslator {
    * For unit testing only
    */
   @VisibleForTesting
-  public void translate(SamzaSqlQueryParser.QueryInfo queryInfo, StreamApplicationDescriptor appDesc) {
+  public void translate(SamzaSqlQueryParser.QueryInfo queryInfo, StreamApplicationDescriptor appDesc, int queryId) {
     QueryPlanner planner =
         new QueryPlanner(sqlConfig.getRelSchemaProviders(), sqlConfig.getSystemStreamConfigsBySource(),
             sqlConfig.getUdfMetadata());
     final RelRoot relRoot = planner.plan(queryInfo.getSql());
-    int queryId = 1;
     SamzaSqlExecutionContext executionContext = new SamzaSqlExecutionContext(sqlConfig);
-    Map<String, SamzaRelConverter> converters = sqlConfig.getSamzaRelConverters();
-    TranslatorContext translatorContext = new TranslatorContext(appDesc, relRoot, executionContext, converters);
+    TranslatorContext translatorContext = new TranslatorContext(appDesc, relRoot, executionContext);
     translate(relRoot, translatorContext, queryId);
     Map<Integer, TranslatorContext> translatorContexts = new HashMap<>();
     translatorContexts.put(queryId, translatorContext.clone());
@@ -124,7 +122,6 @@ public class QueryTranslator {
   }
 
   public void translate(RelRoot relRoot, TranslatorContext translatorContext, int queryId) {
-    final SqlIOResolver ioResolver = translatorContext.getExecutionContext().getSamzaSqlApplicationConfig().getIoResolver();
     final RelNode node = relRoot.project();
     ScanTranslator scanTranslator =
         new ScanTranslator(sqlConfig.getSamzaRelConverters(), sqlConfig.getInputSystemStreamConfigBySource(), queryId);
@@ -177,7 +174,7 @@ public class QueryTranslator {
       public RelNode visit(LogicalJoin join) {
         RelNode node = super.visit(join);
         joinId++;
-        new JoinTranslator(joinId, ioResolver, sqlConfig.getMetadataTopicPrefix())
+        new JoinTranslator(joinId, sqlConfig.getMetadataTopicPrefix(), queryId)
             .translate(join, translatorContext);
         return node;
       }

http://git-wip-us.apache.org/repos/asf/samza/blob/12f421ce/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlLocalTableJoinFunction.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlLocalTableJoinFunction.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlLocalTableJoinFunction.java
new file mode 100644
index 0000000..f39f39a
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlLocalTableJoinFunction.java
@@ -0,0 +1,54 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.translator;
+
+import java.util.List;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.samza.operators.KV;
+import org.apache.samza.sql.SamzaSqlRelRecord;
+import org.apache.samza.sql.data.SamzaSqlRelMessage;
+
+
+/**
+ * This class joins incoming {@link SamzaSqlRelMessage} with records {@link KV}&lt;{@link SamzaSqlRelRecord}, {@link SamzaSqlRelMessage}&gt;}
+ * from local table with the join key being {@link SamzaSqlRelRecord}.
+ */
+public class SamzaSqlLocalTableJoinFunction
+    extends SamzaSqlTableJoinFunction<SamzaSqlRelRecord, KV<SamzaSqlRelRecord, SamzaSqlRelMessage>> {
+
+  SamzaSqlLocalTableJoinFunction(JoinInputNode streamNode, JoinInputNode tableNode, JoinRelType joinRelType) {
+    super(streamNode, tableNode, joinRelType);
+  }
+
+  @Override
+  protected List<Object> getTableRelRecordFieldValues(KV<SamzaSqlRelRecord, SamzaSqlRelMessage> record) {
+    return record.getValue().getSamzaSqlRelRecord().getFieldValues();
+  }
+
+  @Override
+  public SamzaSqlRelRecord getMessageKey(SamzaSqlRelMessage message) {
+    return getMessageKeyRelRecord(message);
+  }
+
+  @Override
+  public SamzaSqlRelRecord getRecordKey(KV<SamzaSqlRelRecord, SamzaSqlRelMessage> record) {
+    return record.getKey();
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/12f421ce/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlRelMessageJoinFunction.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlRelMessageJoinFunction.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlRelMessageJoinFunction.java
deleted file mode 100644
index d0a3d11..0000000
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlRelMessageJoinFunction.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*   http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
-
-package org.apache.samza.sql.translator;
-
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.calcite.rel.core.JoinRelType;
-import org.apache.commons.lang.Validate;
-import org.apache.samza.operators.KV;
-import org.apache.samza.operators.functions.StreamTableJoinFunction;
-import org.apache.samza.sql.SamzaSqlRelRecord;
-import org.apache.samza.sql.data.SamzaSqlRelMessage;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.samza.sql.data.SamzaSqlRelMessage.createSamzaSqlCompositeKey;
-import static org.apache.samza.sql.data.SamzaSqlRelMessage.getSamzaSqlCompositeKeyFieldNames;
-
-
-/**
- * This class joins incoming {@link SamzaSqlRelMessage} from a stream with the records in a table with the join key
- * being {@link SamzaSqlRelRecord}
- */
-public class SamzaSqlRelMessageJoinFunction
-    implements StreamTableJoinFunction<SamzaSqlRelRecord, SamzaSqlRelMessage, KV<SamzaSqlRelRecord, SamzaSqlRelMessage>, SamzaSqlRelMessage> {
-
-  private static final Logger log = LoggerFactory.getLogger(SamzaSqlRelMessageJoinFunction.class);
-
-  private final JoinRelType joinRelType;
-  private final boolean isTablePosOnRight;
-  private final ArrayList<Integer> streamFieldIds;
-  // Table field names are used in the outer join when the table record is not found.
-  private final ArrayList<Integer> tableKeyIds;
-  private final ArrayList<String> tableFieldNames;
-  private final ArrayList<String> outFieldNames;
-
-  SamzaSqlRelMessageJoinFunction(JoinRelType joinRelType, boolean isTablePosOnRight,
-      List<Integer> streamFieldIds, List<String> streamFieldNames, List<Integer> tableKeyIds,
-      List<String> tableFieldNames) {
-    this.joinRelType = joinRelType;
-    this.isTablePosOnRight = isTablePosOnRight;
-    Validate.isTrue((joinRelType.compareTo(JoinRelType.LEFT) == 0 && isTablePosOnRight) ||
-        (joinRelType.compareTo(JoinRelType.RIGHT) == 0 && !isTablePosOnRight) ||
-        joinRelType.compareTo(JoinRelType.INNER) == 0);
-    this.streamFieldIds = new ArrayList<>(streamFieldIds);
-    this.tableKeyIds = new ArrayList<>(tableKeyIds);
-    this.tableFieldNames = new ArrayList<>(tableFieldNames);
-    this.outFieldNames = new ArrayList<>();
-    if (isTablePosOnRight) {
-      outFieldNames.addAll(streamFieldNames);
-    }
-    outFieldNames.addAll(tableFieldNames);
-    if (!isTablePosOnRight) {
-      outFieldNames.addAll(streamFieldNames);
-    }
-  }
-
-  @Override
-  public SamzaSqlRelMessage apply(SamzaSqlRelMessage message, KV<SamzaSqlRelRecord, SamzaSqlRelMessage> record) {
-
-    if (joinRelType.compareTo(JoinRelType.INNER) == 0 && record == null) {
-      log.debug("Inner Join: Record not found for the message with key: " + getMessageKey(message));
-      // Returning null would result in Join operator implementation to filter out the message.
-      return null;
-    }
-
-    // The resulting join output should be a SamzaSqlRelMessage containing the fields from both the stream message and
-    // table record. The order of stream message fields and table record fields are dictated by the position of stream
-    // and table in the 'from' clause of sql query. The output should also include the keys from both the stream message
-    // and the table record.
-    List<Object> outFieldValues = new ArrayList<>();
-
-    // If table position is on the right, add the stream message fields first
-    if (isTablePosOnRight) {
-      outFieldValues.addAll(message.getSamzaSqlRelRecord().getFieldValues());
-    }
-
-    // Add the table record fields.
-    if (record != null) {
-      outFieldValues.addAll(record.getValue().getSamzaSqlRelRecord().getFieldValues());
-    } else {
-      // Table record could be null as the record could not be found in the store. This can
-      // happen for outer joins. Add nulls to all the field values in the output message.
-      tableFieldNames.forEach(s -> outFieldValues.add(null));
-    }
-
-    // If table position is on the left, add the stream message fields last
-    if (!isTablePosOnRight) {
-      outFieldValues.addAll(message.getSamzaSqlRelRecord().getFieldValues());
-    }
-
-    return new SamzaSqlRelMessage(outFieldNames, outFieldValues);
-  }
-
-  @Override
-  public SamzaSqlRelRecord getMessageKey(SamzaSqlRelMessage message) {
-    return createSamzaSqlCompositeKey(message, streamFieldIds,
-        getSamzaSqlCompositeKeyFieldNames(tableFieldNames, tableKeyIds));
-  }
-
-  @Override
-  public SamzaSqlRelRecord getRecordKey(KV<SamzaSqlRelRecord, SamzaSqlRelMessage> record) {
-    return record.getKey();
-  }
-
-  @Override
-  public void close() {
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/12f421ce/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlRemoteTableJoinFunction.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlRemoteTableJoinFunction.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlRemoteTableJoinFunction.java
new file mode 100644
index 0000000..11222da
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlRemoteTableJoinFunction.java
@@ -0,0 +1,79 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.translator;
+
+import java.util.List;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.samza.context.Context;
+import org.apache.samza.operators.KV;
+import org.apache.samza.sql.data.SamzaSqlRelMessage;
+import org.apache.samza.sql.interfaces.SamzaRelConverter;
+import org.apache.samza.sql.interfaces.SamzaRelTableKeyConverter;
+import org.apache.samza.sql.runner.SamzaSqlApplicationContext;
+
+
+/**
+ * This class joins incoming {@link SamzaSqlRelMessage} with records {@link KV} from a remote table with the join key
+ * defined by the format of the table key.
+ */
+public class SamzaSqlRemoteTableJoinFunction
+    extends SamzaSqlTableJoinFunction<Object, KV> {
+
+  private transient SamzaRelConverter msgConverter;
+  private transient SamzaRelTableKeyConverter relTableKeyConverter;
+  private final String tableName;
+  private final int queryId;
+
+  SamzaSqlRemoteTableJoinFunction(SamzaRelConverter msgConverter, SamzaRelTableKeyConverter tableKeyConverter,
+      JoinInputNode streamNode, JoinInputNode tableNode, JoinRelType joinRelType, int queryId) {
+    super(streamNode, tableNode, joinRelType);
+
+    this.msgConverter = msgConverter;
+    this.relTableKeyConverter = tableKeyConverter;
+    this.tableName = tableNode.getSourceName();
+    this.queryId = queryId;
+  }
+
+  @Override
+  public void init(Context context) {
+    TranslatorContext translatorContext =
+        ((SamzaSqlApplicationContext) context.getApplicationTaskContext()).getTranslatorContexts().get(queryId);
+    this.msgConverter = translatorContext.getMsgConverter(tableName);
+    this.relTableKeyConverter = translatorContext.getTableKeyConverter(tableName);
+  }
+
+  @Override
+  protected List<Object> getTableRelRecordFieldValues(KV record) {
+    // Using the message rel converter, convert message to sql rel message and add to output values.
+    SamzaSqlRelMessage relMessage = msgConverter.convertToRelMessage(record);
+    return relMessage.getSamzaSqlRelRecord().getFieldValues();
+  }
+
+  @Override
+  public Object getMessageKey(SamzaSqlRelMessage message) {
+    // Using the table key converter, convert message key from rel format to the record key format.
+    return relTableKeyConverter.convertToTableKeyFormat(getMessageKeyRelRecord(message));
+  }
+
+  @Override
+  public Object getRecordKey(KV record) {
+    return record.getKey();
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/12f421ce/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlTableJoinFunction.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlTableJoinFunction.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlTableJoinFunction.java
new file mode 100644
index 0000000..bbbf1d6
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlTableJoinFunction.java
@@ -0,0 +1,121 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.translator;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.commons.lang.Validate;
+import org.apache.samza.operators.functions.StreamTableJoinFunction;
+import org.apache.samza.sql.SamzaSqlRelRecord;
+import org.apache.samza.sql.data.SamzaSqlRelMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.samza.sql.data.SamzaSqlRelMessage.createSamzaSqlCompositeKey;
+import static org.apache.samza.sql.data.SamzaSqlRelMessage.getSamzaSqlCompositeKeyFieldNames;
+
+
+/**
+ * This abstract class joins incoming {@link SamzaSqlRelMessage} with records from a table with the join key.
+ */
+public abstract class SamzaSqlTableJoinFunction<K, R>
+    implements StreamTableJoinFunction<K, SamzaSqlRelMessage, R, SamzaSqlRelMessage> {
+
+  private static final Logger log = LoggerFactory.getLogger(SamzaSqlTableJoinFunction.class);
+
+  private final JoinRelType joinRelType;
+  private final boolean isTablePosOnRight;
+  private final ArrayList<Integer> streamFieldIds;
+  private final ArrayList<Integer> tableKeyIds;
+  // Table field names are used in the outer join when the table record is not found.
+  private final ArrayList<String> tableFieldNames;
+  private final ArrayList<String> outFieldNames;
+
+  SamzaSqlTableJoinFunction(JoinInputNode streamNode, JoinInputNode tableNode, JoinRelType joinRelType) {
+    this.joinRelType = joinRelType;
+    this.isTablePosOnRight = tableNode.isPosOnRight();
+
+    Validate.isTrue((joinRelType.compareTo(JoinRelType.LEFT) == 0 && isTablePosOnRight) ||
+        (joinRelType.compareTo(JoinRelType.RIGHT) == 0 && !isTablePosOnRight) ||
+        joinRelType.compareTo(JoinRelType.INNER) == 0);
+
+    this.streamFieldIds = new ArrayList<>(streamNode.getKeyIds());
+    this.tableKeyIds = new ArrayList<>(tableNode.getKeyIds());
+    this.tableFieldNames = new ArrayList<>(tableNode.getFieldNames());
+
+    this.outFieldNames = new ArrayList<>();
+    if (isTablePosOnRight) {
+      outFieldNames.addAll(streamNode.getFieldNames());
+      outFieldNames.addAll(tableFieldNames);
+    } else {
+      outFieldNames.addAll(tableFieldNames);
+      outFieldNames.addAll(streamNode.getFieldNames());
+    }
+  }
+
+  @Override
+  public SamzaSqlRelMessage apply(SamzaSqlRelMessage message, R record) {
+
+    if (joinRelType.compareTo(JoinRelType.INNER) == 0 && record == null) {
+      log.debug("Inner Join: Record not found for the message with key: " + getMessageKey(message));
+      // Returning null would result in Join operator implementation to filter out the message.
+      return null;
+    }
+
+    // The resulting join output should be a SamzaSqlRelMessage containing the fields from both the stream message and
+    // table record. The order of stream message fields and table record fields are dictated by the position of stream
+    // and table in the 'from' clause of sql query. The output should also include the keys from both the stream message
+    // and the table record.
+    List<Object> outFieldValues = new ArrayList<>();
+
+    // If table position is on the right, add the stream message fields first
+    if (isTablePosOnRight) {
+      outFieldValues.addAll(message.getSamzaSqlRelRecord().getFieldValues());
+    }
+
+    // Add the table record fields.
+    if (record != null) {
+      outFieldValues.addAll(getTableRelRecordFieldValues(record));
+    } else {
+      // Table record could be null as the record could not be found in the store. This can
+      // happen for outer joins. Add nulls to all the field values in the output message.
+      tableFieldNames.forEach(s -> outFieldValues.add(null));
+    }
+
+    // If table position is on the left, add the stream message fields last
+    if (!isTablePosOnRight) {
+      outFieldValues.addAll(message.getSamzaSqlRelRecord().getFieldValues());
+    }
+
+    return new SamzaSqlRelMessage(outFieldNames, outFieldValues);
+  }
+
+  protected abstract List<Object> getTableRelRecordFieldValues(R record);
+
+  protected SamzaSqlRelRecord getMessageKeyRelRecord(SamzaSqlRelMessage message) {
+    return createSamzaSqlCompositeKey(message, streamFieldIds,
+        getSamzaSqlCompositeKeyFieldNames(tableFieldNames, tableKeyIds));
+  }
+
+  @Override
+  public void close() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/12f421ce/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
index 27138a4..2615aad 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
@@ -36,6 +36,7 @@ import org.apache.samza.sql.data.SamzaSqlRelMessage;
 import org.apache.samza.sql.interfaces.SamzaRelConverter;
 import org.apache.samza.sql.interfaces.SqlIOConfig;
 import org.apache.samza.sql.runner.SamzaSqlApplicationContext;
+import org.apache.samza.table.descriptors.RemoteTableDescriptor;
 
 
 /**
@@ -92,6 +93,17 @@ class ScanTranslator {
     final String streamName = sqlIOConfig.getStreamName();
     final String source = sqlIOConfig.getSource();
 
+    final boolean isRemoteTable = sqlIOConfig.getTableDescriptor().isPresent() &&
+        (sqlIOConfig.getTableDescriptor().get() instanceof RemoteTableDescriptor);
+
+    // For remote table, we don't have an input stream descriptor. The table descriptor is already defined by the
+    // SqlIOResolverFactory.
+    // For local table, even though table descriptor is already defined, we still need to create the input stream
+    // descriptor to load the local table.
+    if (isRemoteTable) {
+      return;
+    }
+
     KVSerde<Object, Object> noOpKVSerde = KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>());
     DelegatingSystemDescriptor
         sd = systemDescriptors.computeIfAbsent(systemName, DelegatingSystemDescriptor::new);

http://git-wip-us.apache.org/repos/asf/samza/blob/12f421ce/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorContext.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorContext.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorContext.java
index 099217b..5990897 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorContext.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorContext.java
@@ -34,6 +34,7 @@ import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
 import org.apache.samza.operators.MessageStream;
+import org.apache.samza.sql.interfaces.SamzaRelTableKeyConverter;
 import org.apache.samza.system.descriptors.DelegatingSystemDescriptor;
 import org.apache.samza.sql.data.RexToJavaCompiler;
 import org.apache.samza.sql.data.SamzaSqlExecutionContext;
@@ -50,6 +51,7 @@ public class TranslatorContext implements Cloneable {
   private final StreamApplicationDescriptor streamAppDesc;
   private final RexToJavaCompiler compiler;
   private final Map<String, SamzaRelConverter> relSamzaConverters;
+  private final Map<String, SamzaRelTableKeyConverter> relTableKeyConverters;
   private final Map<Integer, MessageStream> messageStreams;
   private final Map<Integer, RelNode> relNodes;
   private final Map<String, DelegatingSystemDescriptor> systemDescriptors;
@@ -124,6 +126,7 @@ public class TranslatorContext implements Cloneable {
     this.streamAppDesc  = other.streamAppDesc;
     this.compiler = other.compiler;
     this.relSamzaConverters = other.relSamzaConverters;
+    this.relTableKeyConverters = other.relTableKeyConverters;
     this.messageStreams = other.messageStreams;
     this.relNodes = other.relNodes;
     this.executionContext = other.executionContext.clone();
@@ -136,14 +139,14 @@ public class TranslatorContext implements Cloneable {
    * @param streamAppDesc Samza's streamAppDesc that is populated during the translation.
    * @param relRoot Root of the relational graph from calcite.
    * @param executionContext the execution context
-   * @param converters the map of schema to RelData converters
    */
-  public TranslatorContext(StreamApplicationDescriptor streamAppDesc, RelRoot relRoot, SamzaSqlExecutionContext executionContext, Map<String, SamzaRelConverter> converters) {
+  public TranslatorContext(StreamApplicationDescriptor streamAppDesc, RelRoot relRoot, SamzaSqlExecutionContext executionContext) {
     this.streamAppDesc = streamAppDesc;
     this.compiler = createExpressionCompiler(relRoot);
     this.executionContext = executionContext;
     this.dataContext = new DataContextImpl();
-    this.relSamzaConverters = converters;
+    this.relSamzaConverters = executionContext.getSamzaSqlApplicationConfig().getSamzaRelConverters();
+    this.relTableKeyConverters = executionContext.getSamzaSqlApplicationConfig().getSamzaRelTableKeyConverters();
     this.messageStreams = new HashMap<>();
     this.relNodes = new HashMap<>();
     this.systemDescriptors = new HashMap<>();
@@ -212,6 +215,10 @@ public class TranslatorContext implements Cloneable {
     return this.relSamzaConverters.get(source);
   }
 
+  SamzaRelTableKeyConverter getTableKeyConverter(String source) {
+    return this.relTableKeyConverters.get(source);
+  }
+
   /**
    * This method helps to create a per task instance of translator context
    *

http://git-wip-us.apache.org/repos/asf/samza/blob/12f421ce/samza-sql/src/test/java/org/apache/samza/sql/e2e/TestSamzaSqlRemoteTable.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/e2e/TestSamzaSqlRemoteTable.java b/samza-sql/src/test/java/org/apache/samza/sql/e2e/TestSamzaSqlRemoteTable.java
new file mode 100644
index 0000000..5bb180e
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/e2e/TestSamzaSqlRemoteTable.java
@@ -0,0 +1,115 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.e2e;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
+import org.apache.samza.sql.runner.SamzaSqlApplicationRunner;
+import org.apache.samza.sql.system.TestAvroSystemFactory;
+import org.apache.samza.sql.testutil.JsonUtil;
+import org.apache.samza.sql.testutil.SamzaSqlTestConfig;
+import org.apache.samza.sql.testutil.RemoteStoreIOResolverTestFactory;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class TestSamzaSqlRemoteTable {
+  @Test
+  public void testSinkEndToEndWithKey() throws Exception {
+    int numMessages = 20;
+
+    RemoteStoreIOResolverTestFactory.records.clear();
+
+    Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
+
+    String sql = "Insert into testRemoteStore.testTable.`$table` select __key__, id, name from testavro.SIMPLE1";
+    List<String> sqlStmts = Arrays.asList(sql);
+    staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
+    SamzaSqlApplicationRunner appRunnable = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs));
+    appRunnable.runAndWaitForFinish();
+
+    Assert.assertEquals(numMessages, RemoteStoreIOResolverTestFactory.records.size());
+  }
+
+  @Test (expected = AssertionError.class)
+  public void testSinkEndToEndWithoutKey() throws Exception {
+    int numMessages = 20;
+
+    RemoteStoreIOResolverTestFactory.records.clear();
+    Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
+
+    String sql = "Insert into testRemoteStore.testTable.`$table`(id,name) select id, name from testavro.SIMPLE1";
+    List<String> sqlStmts = Arrays.asList(sql);
+    staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
+    SamzaSqlApplicationRunner appRunnable = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs));
+    appRunnable.runAndWaitForFinish();
+
+    Assert.assertEquals(numMessages, RemoteStoreIOResolverTestFactory.records.size());
+  }
+
+  @Test
+  public void testSourceEndToEndWithKey() throws Exception {
+    int numMessages = 20;
+
+    Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
+    populateProfileTable(staticConfigs);
+
+    String sql =
+        "Insert into testavro.enrichedPageViewTopic "
+            + "select pv.pageKey as __key__, pv.pageKey as pageKey, coalesce(null, 'N/A') as companyName,"
+            + "       p.name as profileName, p.address as profileAddress "
+            + "from testRemoteStore.Profile.`$table` as p "
+            + "join testavro.PAGEVIEW as pv "
+            + " on p.__key__ = pv.profileId";
+
+    List<String> sqlStmts = Arrays.asList(sql);
+    staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
+    SamzaSqlApplicationRunner appRunnable = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs));
+    appRunnable.runAndWaitForFinish();
+
+    List<String> outMessages = TestAvroSystemFactory.messages.stream()
+        .map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + ","
+            + (((GenericRecord) x.getMessage()).get("profileName") == null ? "null" :
+            ((GenericRecord) x.getMessage()).get("profileName").toString()))
+        .collect(Collectors.toList());
+    Assert.assertEquals(numMessages, outMessages.size());
+    List<String> expectedOutMessages = TestAvroSystemFactory.getPageKeyProfileNameJoin(numMessages);
+    Assert.assertEquals(expectedOutMessages, outMessages);
+  }
+
+  private void populateProfileTable(Map<String, String> staticConfigs) {
+    int numMessages = 20;
+
+    RemoteStoreIOResolverTestFactory.records.clear();
+
+    String sql = "Insert into testRemoteStore.Profile.`$table` select * from testavro.PROFILE";
+    List<String> sqlStmts = Arrays.asList(sql);
+    staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
+    SamzaSqlApplicationRunner appRunnable = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs));
+    appRunnable.runAndWaitForFinish();
+
+    Assert.assertEquals(numMessages, RemoteStoreIOResolverTestFactory.records.size());
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/12f421ce/samza-sql/src/test/java/org/apache/samza/sql/e2e/TestSamzaSqlTable.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/e2e/TestSamzaSqlTable.java b/samza-sql/src/test/java/org/apache/samza/sql/e2e/TestSamzaSqlTable.java
deleted file mode 100644
index ec0a993..0000000
--- a/samza-sql/src/test/java/org/apache/samza/sql/e2e/TestSamzaSqlTable.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*   http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
-
-package org.apache.samza.sql.e2e;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import org.apache.samza.config.MapConfig;
-import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
-import org.apache.samza.sql.runner.SamzaSqlApplicationRunner;
-import org.apache.samza.sql.testutil.JsonUtil;
-import org.apache.samza.sql.testutil.SamzaSqlTestConfig;
-import org.apache.samza.sql.testutil.TestIOResolverFactory;
-import org.junit.Assert;
-import org.junit.Test;
-
-
-public class TestSamzaSqlTable {
-  @Test
-  public void testEndToEnd() throws Exception {
-    int numMessages = 20;
-
-    TestIOResolverFactory.TestTable.records.clear();
-
-    Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
-
-    String sql1 = "Insert into testDb.testTable.`$table`(id,name) select id, name from testavro.SIMPLE1";
-    List<String> sqlStmts = Arrays.asList(sql1);
-    staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
-    SamzaSqlApplicationRunner appRunnable = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs));
-    appRunnable.runAndWaitForFinish();
-
-    Assert.assertEquals(numMessages, TestIOResolverFactory.TestTable.records.size());
-  }
-
-  @Test
-  public void testEndToEndWithKey() throws Exception {
-    int numMessages = 20;
-
-    TestIOResolverFactory.TestTable.records.clear();
-    Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
-
-    String sql1 = "Insert into testDb.testTable.`$table`(id,name) select id __key__, name from testavro.SIMPLE1";
-    List<String> sqlStmts = Arrays.asList(sql1);
-    staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
-    SamzaSqlApplicationRunner appRunnable = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs));
-    appRunnable.runAndWaitForFinish();
-
-    Assert.assertEquals(numMessages, TestIOResolverFactory.TestTable.records.size());
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/12f421ce/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java b/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java
index 41c809e..9a9e269 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java
@@ -141,10 +141,10 @@ public class TestAvroSystemFactory implements SystemFactory {
     private final int numMessages;
     private final boolean includeNullForeignKeys;
     private final long sleepBetweenPollsMs;
-    private final Set<SystemStreamPartition> simpleRecordMap = new HashSet<>();
-    private final Set<SystemStreamPartition> profileRecordMap = new HashSet<>();
-    private final Set<SystemStreamPartition> companyRecordMap = new HashSet<>();
-    private final Set<SystemStreamPartition> pageViewRecordMap = new HashSet<>();
+    private final Set<SystemStreamPartition> simpleRecordSsps = new HashSet<>();
+    private final Set<SystemStreamPartition> profileRecordSsps = new HashSet<>();
+    private final Set<SystemStreamPartition> companyRecordSsps = new HashSet<>();
+    private final Set<SystemStreamPartition> pageViewRecordSsps = new HashSet<>();
     private final Map<SystemStreamPartition, Integer> curMessagesPerSsp = new HashMap<>();
 
     public TestAvroSystemConsumer(String systemName, Config config) {
@@ -165,16 +165,16 @@ public class TestAvroSystemFactory implements SystemFactory {
     @Override
     public void register(SystemStreamPartition systemStreamPartition, String offset) {
       if (systemStreamPartition.getStream().toLowerCase().contains("simple1")) {
-        simpleRecordMap.add(systemStreamPartition);
+        simpleRecordSsps.add(systemStreamPartition);
       }
       if (systemStreamPartition.getStream().toLowerCase().contains("profile")) {
-        profileRecordMap.add(systemStreamPartition);
+        profileRecordSsps.add(systemStreamPartition);
       }
       if (systemStreamPartition.getStream().toLowerCase().contains("company")) {
-        companyRecordMap.add(systemStreamPartition);
+        companyRecordSsps.add(systemStreamPartition);
       }
       if (systemStreamPartition.getStream().toLowerCase().contains("pageview")) {
-        pageViewRecordMap.add(systemStreamPartition);
+        pageViewRecordSsps.add(systemStreamPartition);
       }
       curMessagesPerSsp.put(systemStreamPartition, 0);
     }
@@ -202,17 +202,20 @@ public class TestAvroSystemFactory implements SystemFactory {
     }
 
     private Object getKey(int index, SystemStreamPartition ssp) {
+      if (profileRecordSsps.contains(ssp) || companyRecordSsps.contains(ssp)) {
+        return index; // Keep this value the same as the profile/company record's "id" field.
+      }
       return "key" + index;
     }
 
     private Object getData(int index, SystemStreamPartition ssp) {
-      if (simpleRecordMap.contains(ssp)) {
+      if (simpleRecordSsps.contains(ssp)) {
         return createSimpleRecord(index);
-      } else if (profileRecordMap.contains(ssp)) {
+      } else if (profileRecordSsps.contains(ssp)) {
         return createProfileRecord(index);
-      } else if (companyRecordMap.contains(ssp)) {
+      } else if (companyRecordSsps.contains(ssp)) {
         return createCompanyRecord(index);
-      } else if (pageViewRecordMap.contains(ssp)) {
+      } else if (pageViewRecordSsps.contains(ssp)) {
         return createPageViewRecord(index);
       } else {
         return createComplexRecord(index);

http://git-wip-us.apache.org/repos/asf/samza/blob/12f421ce/samza-sql/src/test/java/org/apache/samza/sql/testutil/RemoteStoreIOResolverTestFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/testutil/RemoteStoreIOResolverTestFactory.java b/samza-sql/src/test/java/org/apache/samza/sql/testutil/RemoteStoreIOResolverTestFactory.java
new file mode 100644
index 0000000..6ee9ee8
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/testutil/RemoteStoreIOResolverTestFactory.java
@@ -0,0 +1,147 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.testutil;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import org.apache.samza.config.Config;
+import org.apache.samza.sql.serializers.SamzaSqlRelMessageSerdeFactory;
+import org.apache.samza.sql.serializers.SamzaSqlRelRecordSerdeFactory;
+import org.apache.samza.table.descriptors.RemoteTableDescriptor;
+import org.apache.samza.table.descriptors.TableDescriptor;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.sql.interfaces.SqlIOConfig;
+import org.apache.samza.sql.interfaces.SqlIOResolver;
+import org.apache.samza.sql.interfaces.SqlIOResolverFactory;
+import org.apache.samza.storage.kv.descriptors.RocksDbTableDescriptor;
+import org.apache.samza.table.remote.TableReadFunction;
+import org.apache.samza.table.remote.TableWriteFunction;
+
+import static org.apache.samza.sql.runner.SamzaSqlApplicationConfig.CFG_METADATA_TOPIC_PREFIX;
+import static org.apache.samza.sql.runner.SamzaSqlApplicationConfig.DEFAULT_METADATA_TOPIC_PREFIX;
+
+
+public class RemoteStoreIOResolverTestFactory implements SqlIOResolverFactory {
+  public static final String TEST_REMOTE_STORE_SYSTEM = "testRemoteStore";
+  public static final String TEST_TABLE_ID = "testTableId";
+
+  public static transient Map<Object, Object> records = new HashMap<>();
+
+  @Override
+  public SqlIOResolver create(Config config, Config fullConfig) {
+    return new TestRemoteStoreIOResolver(config);
+  }
+
+  public static class InMemoryWriteFunction implements TableWriteFunction<Object, Object> {
+
+    @Override
+    public CompletableFuture<Void> putAsync(Object key, Object record) {
+      records.put(key.toString(), record);
+      return CompletableFuture.completedFuture(null);
+    }
+
+    @Override
+    public CompletableFuture<Void> deleteAsync(Object key) {
+      records.remove(key.toString());
+      return CompletableFuture.completedFuture(null);
+    }
+
+    @Override
+    public boolean isRetriable(Throwable exception) {
+      return false;
+    }
+  }
+
+  static class InMemoryReadFunction implements TableReadFunction<Object, Object> {
+
+    @Override
+    public CompletableFuture<Object> getAsync(Object key) {
+      return CompletableFuture.completedFuture(records.get(key.toString()));
+    }
+
+    @Override
+    public boolean isRetriable(Throwable exception) {
+      return false;
+    }
+  }
+
+  private class TestRemoteStoreIOResolver implements SqlIOResolver {
+    private final String SAMZA_SQL_QUERY_TABLE_KEYWORD = "$table";
+    private final Config config;
+    private final Map<String, TableDescriptor> tableDescMap = new HashMap<>();
+    private final String changeLogStorePrefix;
+
+    public TestRemoteStoreIOResolver(Config config) {
+      this.config = config;
+      String metadataTopicPrefix = config.get(CFG_METADATA_TOPIC_PREFIX, DEFAULT_METADATA_TOPIC_PREFIX);
+      this.changeLogStorePrefix = metadataTopicPrefix + (metadataTopicPrefix.isEmpty() ? "" : "_");
+    }
+
+    private SqlIOConfig fetchIOInfo(String ioName, boolean isSink) {
+      String[] sourceComponents = ioName.split("\\.");
+      int systemIdx = 0;
+      int endIdx = sourceComponents.length - 1;
+      int streamIdx = endIdx;
+      TableDescriptor tableDescriptor = null;
+
+      if (sourceComponents[endIdx].equalsIgnoreCase(SAMZA_SQL_QUERY_TABLE_KEYWORD)) {
+        streamIdx = endIdx - 1;
+
+        tableDescriptor = tableDescMap.get(ioName);
+
+        if (tableDescriptor == null) {
+          if (isSink) {
+            tableDescriptor = new RemoteTableDescriptor<>(TEST_TABLE_ID + "-" + ioName.replace(".", "-").replace("$", "-"))
+                .withReadFunction(new InMemoryReadFunction())
+                .withWriteFunction(new InMemoryWriteFunction());
+          } else if (sourceComponents[systemIdx].equals(TEST_REMOTE_STORE_SYSTEM)) {
+            tableDescriptor = new RemoteTableDescriptor<>(TEST_TABLE_ID + "-" + ioName.replace(".", "-").replace("$", "-"))
+                .withReadFunction(new InMemoryReadFunction());
+          } else {
+            // A local table
+            String tableId = changeLogStorePrefix + "InputTable-" + ioName.replace(".", "-").replace("$", "-");
+            SamzaSqlRelRecordSerdeFactory.SamzaSqlRelRecordSerde keySerde =
+                (SamzaSqlRelRecordSerdeFactory.SamzaSqlRelRecordSerde) new SamzaSqlRelRecordSerdeFactory().getSerde(null, null);
+            SamzaSqlRelMessageSerdeFactory.SamzaSqlRelMessageSerde valueSerde =
+                (SamzaSqlRelMessageSerdeFactory.SamzaSqlRelMessageSerde) new SamzaSqlRelMessageSerdeFactory().getSerde(null, null);
+            tableDescriptor = new RocksDbTableDescriptor(tableId, KVSerde.of(keySerde, valueSerde)).withChangelogEnabled();
+          }
+          tableDescMap.put(ioName, tableDescriptor);
+        }
+      }
+
+      Config systemConfigs = config.subset(sourceComponents[systemIdx] + ".");
+      return new SqlIOConfig(sourceComponents[systemIdx], sourceComponents[streamIdx],
+          Arrays.asList(sourceComponents), systemConfigs, tableDescriptor);
+    }
+
+    @Override
+    public SqlIOConfig fetchSourceInfo(String sourceName) {
+      return fetchIOInfo(sourceName, false);
+    }
+
+    @Override
+    public SqlIOConfig fetchSinkInfo(String sinkName) {
+      return fetchIOInfo(sinkName, true);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/12f421ce/samza-sql/src/test/java/org/apache/samza/sql/testutil/SampleRelTableKeyConverter.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/testutil/SampleRelTableKeyConverter.java b/samza-sql/src/test/java/org/apache/samza/sql/testutil/SampleRelTableKeyConverter.java
new file mode 100644
index 0000000..9f3f8a0
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/testutil/SampleRelTableKeyConverter.java
@@ -0,0 +1,36 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.testutil;
+
+import java.util.stream.Collectors;
+import org.apache.samza.sql.SamzaSqlRelRecord;
+import org.apache.samza.sql.interfaces.SamzaRelTableKeyConverter;
+
+
+/**
+ * A sample {@link SamzaRelTableKeyConverter} used in tests to convert the join key to table format.
+ */
+public class SampleRelTableKeyConverter implements SamzaRelTableKeyConverter {
+
+  @Override
+  public Object convertToTableKeyFormat(SamzaSqlRelRecord relRecord) {
+    return relRecord.getFieldValues().stream().map(Object::toString).collect(Collectors.toList()).get(0);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/12f421ce/samza-sql/src/test/java/org/apache/samza/sql/testutil/SampleRelTableKeyConverterFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/testutil/SampleRelTableKeyConverterFactory.java b/samza-sql/src/test/java/org/apache/samza/sql/testutil/SampleRelTableKeyConverterFactory.java
new file mode 100644
index 0000000..2853ed4
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/testutil/SampleRelTableKeyConverterFactory.java
@@ -0,0 +1,41 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.testutil;
+
+import java.util.HashMap;
+import org.apache.samza.config.Config;
+import org.apache.samza.sql.interfaces.RelSchemaProvider;
+import org.apache.samza.sql.interfaces.SamzaRelTableKeyConverter;
+import org.apache.samza.sql.interfaces.SamzaRelTableKeyConverterFactory;
+import org.apache.samza.system.SystemStream;
+
+
+/**
+ * A sample {@link SamzaRelTableKeyConverterFactory} used in tests to create {@link SampleRelTableKeyConverter}.
+ */
+public class SampleRelTableKeyConverterFactory implements SamzaRelTableKeyConverterFactory {
+
+  private final HashMap<SystemStream, SamzaRelTableKeyConverter> relConverters = new HashMap<>();
+
+  @Override
+  public SamzaRelTableKeyConverter create(SystemStream systemStream, Config config) {
+    return relConverters.computeIfAbsent(systemStream, ss -> new SampleRelTableKeyConverter());
+  }
+}