You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by at...@apache.org on 2018/12/11 19:16:57 UTC
samza git commit: SAMZA-2035: Detect CachingTableDescriptor as remote
table.
Repository: samza
Updated Branches:
refs/heads/master 9696154ab -> 2e5e19df3
SAMZA-2035: Detect CachingTableDescriptor as remote table.
Author: Aditya Toomula <at...@linkedin.com>
Reviewers: shenodaguirguis,weiqingy
Closes #854 from atoomula/cache and squashes the following commits:
ecc3b93f [Aditya Toomula] SAMZA-2035: Detect CachingTableDescriptor as remote table.
ecc3bdba [Aditya Toomula] Detect CachingTableDescriptor as remote table.
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/2e5e19df
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/2e5e19df
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/2e5e19df
Branch: refs/heads/master
Commit: 2e5e19df3ce092f4f16afb97d0f2f7511b4dd33d
Parents: 9696154
Author: Aditya Toomula <at...@linkedin.com>
Authored: Tue Dec 11 11:16:49 2018 -0800
Committer: Aditya Toomula <at...@linkedin.com>
Committed: Tue Dec 11 11:16:49 2018 -0800
----------------------------------------------------------------------
.../main/java/org/apache/samza/sql/interfaces/SqlIOConfig.java | 4 +++-
.../java/org/apache/samza/sql/translator/JoinTranslator.java | 4 +++-
.../java/org/apache/samza/sql/translator/ScanTranslator.java | 4 +++-
3 files changed, 9 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/2e5e19df/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlIOConfig.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlIOConfig.java b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlIOConfig.java
index 5fa30e7..d92faae 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlIOConfig.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlIOConfig.java
@@ -29,6 +29,7 @@ import org.apache.commons.lang.Validate;
import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
import org.apache.samza.config.StreamConfig;
+import org.apache.samza.table.descriptors.CachingTableDescriptor;
import org.apache.samza.table.descriptors.RemoteTableDescriptor;
import org.apache.samza.table.descriptors.TableDescriptor;
import org.apache.samza.system.SystemStream;
@@ -155,6 +156,7 @@ public class SqlIOConfig {
}
public boolean isRemoteTable() {
- return tableDescriptor.isPresent() && tableDescriptor.get() instanceof RemoteTableDescriptor;
+ return tableDescriptor.isPresent() && (tableDescriptor.get() instanceof RemoteTableDescriptor ||
+ tableDescriptor.get() instanceof CachingTableDescriptor);
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/2e5e19df/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
index c99551e..db0349d 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
@@ -49,6 +49,7 @@ import org.apache.samza.sql.interfaces.SqlIOConfig;
import org.apache.samza.sql.serializers.SamzaSqlRelMessageSerdeFactory;
import org.apache.samza.sql.serializers.SamzaSqlRelRecordSerdeFactory;
import org.apache.samza.table.Table;
+import org.apache.samza.table.descriptors.CachingTableDescriptor;
import org.apache.samza.table.descriptors.RemoteTableDescriptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -322,7 +323,8 @@ class JoinTranslator {
SqlIOConfig sourceTableConfig = resolveSourceConfigForTable(relNode, context);
if (sourceTableConfig == null || !sourceTableConfig.getTableDescriptor().isPresent()) {
return JoinInputNode.InputType.STREAM;
- } else if (sourceTableConfig.getTableDescriptor().get() instanceof RemoteTableDescriptor) {
+ } else if (sourceTableConfig.getTableDescriptor().get() instanceof RemoteTableDescriptor ||
+ sourceTableConfig.getTableDescriptor().get() instanceof CachingTableDescriptor) {
return JoinInputNode.InputType.REMOTE_TABLE;
} else {
return JoinInputNode.InputType.LOCAL_TABLE;
http://git-wip-us.apache.org/repos/asf/samza/blob/2e5e19df/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
index aa73f94..7f1ff39 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
@@ -43,6 +43,7 @@ import org.apache.samza.sql.interfaces.SqlIOConfig;
import org.apache.samza.sql.runner.SamzaSqlApplicationContext;
import org.apache.samza.system.descriptors.DelegatingSystemDescriptor;
import org.apache.samza.system.descriptors.GenericInputDescriptor;
+import org.apache.samza.table.descriptors.CachingTableDescriptor;
import org.apache.samza.table.descriptors.RemoteTableDescriptor;
@@ -157,7 +158,8 @@ class ScanTranslator {
final String source = sqlIOConfig.getSource();
final boolean isRemoteTable = sqlIOConfig.getTableDescriptor().isPresent() &&
- (sqlIOConfig.getTableDescriptor().get() instanceof RemoteTableDescriptor);
+ (sqlIOConfig.getTableDescriptor().get() instanceof RemoteTableDescriptor ||
+ sqlIOConfig.getTableDescriptor().get() instanceof CachingTableDescriptor);
// For remote table, we don't have an input stream descriptor. The table descriptor is already defined by the
// SqlIOResolverFactory.