You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by at...@apache.org on 2018/12/11 19:16:57 UTC

samza git commit: SAMZA-2035: Detect CachingTableDescriptor as remote table.

Repository: samza
Updated Branches:
  refs/heads/master 9696154ab -> 2e5e19df3


SAMZA-2035: Detect CachingTableDescriptor as remote table.

Author: Aditya Toomula <at...@linkedin.com>

Reviewers: shenodaguirguis,weiqingy

Closes #854 from atoomula/cache and squashes the following commits:

ecc3b93f [Aditya Toomula] SAMZA-2035: Detect CachingTableDescriptor as remote table.
ecc3bdba [Aditya Toomula] Detect CachingTableDescriptor as remote table.


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/2e5e19df
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/2e5e19df
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/2e5e19df

Branch: refs/heads/master
Commit: 2e5e19df3ce092f4f16afb97d0f2f7511b4dd33d
Parents: 9696154
Author: Aditya Toomula <at...@linkedin.com>
Authored: Tue Dec 11 11:16:49 2018 -0800
Committer: Aditya Toomula <at...@linkedin.com>
Committed: Tue Dec 11 11:16:49 2018 -0800

----------------------------------------------------------------------
 .../main/java/org/apache/samza/sql/interfaces/SqlIOConfig.java   | 4 +++-
 .../java/org/apache/samza/sql/translator/JoinTranslator.java     | 4 +++-
 .../java/org/apache/samza/sql/translator/ScanTranslator.java     | 4 +++-
 3 files changed, 9 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/2e5e19df/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlIOConfig.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlIOConfig.java b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlIOConfig.java
index 5fa30e7..d92faae 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlIOConfig.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlIOConfig.java
@@ -29,6 +29,7 @@ import org.apache.commons.lang.Validate;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.StreamConfig;
+import org.apache.samza.table.descriptors.CachingTableDescriptor;
 import org.apache.samza.table.descriptors.RemoteTableDescriptor;
 import org.apache.samza.table.descriptors.TableDescriptor;
 import org.apache.samza.system.SystemStream;
@@ -155,6 +156,7 @@ public class SqlIOConfig {
   }
 
   public boolean isRemoteTable() {
-    return tableDescriptor.isPresent() && tableDescriptor.get() instanceof RemoteTableDescriptor;
+    return tableDescriptor.isPresent() && (tableDescriptor.get() instanceof RemoteTableDescriptor ||
+        tableDescriptor.get() instanceof CachingTableDescriptor);
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/2e5e19df/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
index c99551e..db0349d 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
@@ -49,6 +49,7 @@ import org.apache.samza.sql.interfaces.SqlIOConfig;
 import org.apache.samza.sql.serializers.SamzaSqlRelMessageSerdeFactory;
 import org.apache.samza.sql.serializers.SamzaSqlRelRecordSerdeFactory;
 import org.apache.samza.table.Table;
+import org.apache.samza.table.descriptors.CachingTableDescriptor;
 import org.apache.samza.table.descriptors.RemoteTableDescriptor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -322,7 +323,8 @@ class JoinTranslator {
       SqlIOConfig sourceTableConfig = resolveSourceConfigForTable(relNode, context);
       if (sourceTableConfig == null || !sourceTableConfig.getTableDescriptor().isPresent()) {
         return JoinInputNode.InputType.STREAM;
-      } else if (sourceTableConfig.getTableDescriptor().get() instanceof RemoteTableDescriptor) {
+      } else if (sourceTableConfig.getTableDescriptor().get() instanceof RemoteTableDescriptor ||
+          sourceTableConfig.getTableDescriptor().get() instanceof CachingTableDescriptor) {
         return JoinInputNode.InputType.REMOTE_TABLE;
       } else {
         return JoinInputNode.InputType.LOCAL_TABLE;

http://git-wip-us.apache.org/repos/asf/samza/blob/2e5e19df/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
index aa73f94..7f1ff39 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
@@ -43,6 +43,7 @@ import org.apache.samza.sql.interfaces.SqlIOConfig;
 import org.apache.samza.sql.runner.SamzaSqlApplicationContext;
 import org.apache.samza.system.descriptors.DelegatingSystemDescriptor;
 import org.apache.samza.system.descriptors.GenericInputDescriptor;
+import org.apache.samza.table.descriptors.CachingTableDescriptor;
 import org.apache.samza.table.descriptors.RemoteTableDescriptor;
 
 
@@ -157,7 +158,8 @@ class ScanTranslator {
     final String source = sqlIOConfig.getSource();
 
     final boolean isRemoteTable = sqlIOConfig.getTableDescriptor().isPresent() &&
-        (sqlIOConfig.getTableDescriptor().get() instanceof RemoteTableDescriptor);
+        (sqlIOConfig.getTableDescriptor().get() instanceof RemoteTableDescriptor ||
+            sqlIOConfig.getTableDescriptor().get() instanceof CachingTableDescriptor);
 
     // For remote table, we don't have an input stream descriptor. The table descriptor is already defined by the
     // SqlIOResolverFactory.