You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by sr...@apache.org on 2018/11/09 16:31:06 UTC
[1/2] samza git commit: SAMZA-1616: Samza-Sql - Support remote table
for stream-table join
Repository: samza
Updated Branches:
refs/heads/master 3da75e61a -> 12f421ce2
http://git-wip-us.apache.org/repos/asf/samza/blob/12f421ce/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java b/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java
index 76ebfc2..80b47eb 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java
@@ -44,6 +44,8 @@ import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
import org.apache.samza.sql.system.TestAvroSystemFactory;
import org.apache.samza.standalone.PassthroughJobCoordinatorFactory;
+import static org.apache.samza.sql.testutil.RemoteStoreIOResolverTestFactory.TEST_REMOTE_STORE_SYSTEM;
+
/**
* Utility to hookup the configs needed to run the Samza Sql application.
@@ -51,7 +53,6 @@ import org.apache.samza.standalone.PassthroughJobCoordinatorFactory;
public class SamzaSqlTestConfig {
public static final String SAMZA_SYSTEM_TEST_AVRO = "testavro";
- public static final String SAMZA_SYSTEM_TEST_DB = "testDb";
public static Map<String, String> fetchStaticConfigsWithFactories(int numberOfMessages) {
return fetchStaticConfigsWithFactories(new HashMap<>(), numberOfMessages, false);
@@ -79,7 +80,7 @@ public class SamzaSqlTestConfig {
String configIOResolverDomain =
String.format(SamzaSqlApplicationConfig.CFG_FMT_SOURCE_RESOLVER_DOMAIN, "config");
staticConfigs.put(configIOResolverDomain + SamzaSqlApplicationConfig.CFG_FACTORY,
- TestIOResolverFactory.class.getName());
+ RemoteStoreIOResolverTestFactory.class.getName());
staticConfigs.put(SamzaSqlApplicationConfig.CFG_UDF_RESOLVER, "config");
String configUdfResolverDomain = String.format(SamzaSqlApplicationConfig.CFG_FMT_UDF_RESOLVER_DOMAIN, "config");
@@ -103,20 +104,26 @@ public class SamzaSqlTestConfig {
staticConfigs.put(avroSamzaSqlConfigPrefix + SqlIOConfig.CFG_SAMZA_REL_CONVERTER, "avro");
staticConfigs.put(avroSamzaSqlConfigPrefix + SqlIOConfig.CFG_REL_SCHEMA_PROVIDER, "config");
- String testDbSamzaSqlConfigPrefix = configIOResolverDomain + String.format("%s.", SAMZA_SYSTEM_TEST_DB);
- staticConfigs.put(testDbSamzaSqlConfigPrefix + SqlIOConfig.CFG_SAMZA_REL_CONVERTER, "avro");
- staticConfigs.put(testDbSamzaSqlConfigPrefix + SqlIOConfig.CFG_REL_SCHEMA_PROVIDER, "config");
+ String testRemoteStoreSamzaSqlConfigPrefix = configIOResolverDomain + String.format("%s.", TEST_REMOTE_STORE_SYSTEM);
+ staticConfigs.put(testRemoteStoreSamzaSqlConfigPrefix + SqlIOConfig.CFG_SAMZA_REL_CONVERTER, "avro");
+ staticConfigs.put(testRemoteStoreSamzaSqlConfigPrefix + SqlIOConfig.CFG_SAMZA_REL_TABLE_KEY_CONVERTER, "sample");
+ staticConfigs.put(testRemoteStoreSamzaSqlConfigPrefix + SqlIOConfig.CFG_REL_SCHEMA_PROVIDER, "config");
String avroSamzaToRelMsgConverterDomain =
String.format(SamzaSqlApplicationConfig.CFG_FMT_SAMZA_REL_CONVERTER_DOMAIN, "avro");
staticConfigs.put(avroSamzaToRelMsgConverterDomain + SamzaSqlApplicationConfig.CFG_FACTORY,
AvroRelConverterFactory.class.getName());
- String testDbSamzaToRelMsgConverterDomain =
- String.format(SamzaSqlApplicationConfig.CFG_FMT_SAMZA_REL_CONVERTER_DOMAIN, TestIOResolverFactory.TEST_DB_SYSTEM);
- staticConfigs.put(testDbSamzaToRelMsgConverterDomain + SamzaSqlApplicationConfig.CFG_FACTORY,
+ String testRemoteStoreSamzaToRelMsgConverterDomain =
+ String.format(SamzaSqlApplicationConfig.CFG_FMT_SAMZA_REL_CONVERTER_DOMAIN, TEST_REMOTE_STORE_SYSTEM);
+ staticConfigs.put(testRemoteStoreSamzaToRelMsgConverterDomain + SamzaSqlApplicationConfig.CFG_FACTORY,
AvroRelConverterFactory.class.getName());
+ String testRemoteStoreSamzaRelTableKeyConverterDomain =
+ String.format(SamzaSqlApplicationConfig.CFG_FMT_SAMZA_REL_TABLE_KEY_CONVERTER_DOMAIN, "sample");
+ staticConfigs.put(testRemoteStoreSamzaRelTableKeyConverterDomain + SamzaSqlApplicationConfig.CFG_FACTORY,
+ SampleRelTableKeyConverterFactory.class.getName());
+
String configAvroRelSchemaProviderDomain =
String.format(SamzaSqlApplicationConfig.CFG_FMT_REL_SCHEMA_PROVIDER_DOMAIN, "config");
staticConfigs.put(configAvroRelSchemaProviderDomain + SamzaSqlApplicationConfig.CFG_FACTORY,
@@ -158,9 +165,11 @@ public class SamzaSqlTestConfig {
staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
"testavro", "pageViewCountTopic"), PageViewCount.SCHEMA$.toString());
- staticConfigs.put(
- configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
- TestIOResolverFactory.TEST_DB_SYSTEM, "testTable"), SimpleRecord.SCHEMA$.toString());
+ staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
+ TEST_REMOTE_STORE_SYSTEM, "testTable"), SimpleRecord.SCHEMA$.toString());
+
+ staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
+ TEST_REMOTE_STORE_SYSTEM, "Profile"), Profile.SCHEMA$.toString());
staticConfigs.putAll(props);
http://git-wip-us.apache.org/repos/asf/samza/blob/12f421ce/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestIOResolverFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestIOResolverFactory.java b/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestIOResolverFactory.java
deleted file mode 100644
index a77be2e..0000000
--- a/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestIOResolverFactory.java
+++ /dev/null
@@ -1,234 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
-
-package org.apache.samza.sql.testutil;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import org.apache.commons.lang.NotImplementedException;
-import org.apache.samza.config.Config;
-import org.apache.samza.sql.serializers.SamzaSqlRelMessageSerdeFactory;
-import org.apache.samza.sql.serializers.SamzaSqlRelRecordSerdeFactory;
-import org.apache.samza.table.descriptors.BaseTableDescriptor;
-import org.apache.samza.table.descriptors.TableDescriptor;
-import org.apache.samza.serializers.KVSerde;
-import org.apache.samza.serializers.NoOpSerde;
-import org.apache.samza.sql.interfaces.SqlIOConfig;
-import org.apache.samza.sql.interfaces.SqlIOResolver;
-import org.apache.samza.sql.interfaces.SqlIOResolverFactory;
-import org.apache.samza.storage.kv.descriptors.RocksDbTableDescriptor;
-import org.apache.samza.table.ReadWriteTable;
-import org.apache.samza.table.Table;
-import org.apache.samza.table.TableProvider;
-import org.apache.samza.table.TableProviderFactory;
-import org.apache.samza.table.TableSpec;
-import org.apache.samza.table.BaseTableProvider;
-
-import static org.apache.samza.sql.runner.SamzaSqlApplicationConfig.CFG_METADATA_TOPIC_PREFIX;
-import static org.apache.samza.sql.runner.SamzaSqlApplicationConfig.DEFAULT_METADATA_TOPIC_PREFIX;
-
-
-public class TestIOResolverFactory implements SqlIOResolverFactory {
- public static final String TEST_DB_SYSTEM = "testDb";
- public static final String TEST_TABLE_ID = "testDbId";
-
- @Override
- public SqlIOResolver create(Config config, Config fullConfig) {
- return new TestIOResolver(config);
- }
-
- static class TestTableDescriptor extends BaseTableDescriptor {
- protected TestTableDescriptor(String tableId) {
- super(tableId);
- }
-
- @Override
- public String getTableId() {
- return tableId;
- }
-
- @Override
- public TableSpec getTableSpec() {
- return new TableSpec(tableId, KVSerde.of(new NoOpSerde(), new NoOpSerde()), TestTableProviderFactory.class.getName(), new HashMap<>());
- }
- }
-
- public static class TestTable implements ReadWriteTable {
- public static Map<Object, Object> records = new HashMap<>();
- @Override
- public Object get(Object key) {
- throw new NotImplementedException();
- }
-
- @Override
- public CompletableFuture getAsync(Object key) {
- throw new NotImplementedException();
- }
-
- @Override
- public Map getAll(List keys) {
- throw new NotImplementedException();
- }
-
- @Override
- public CompletableFuture<Map> getAllAsync(List keys) {
- throw new NotImplementedException();
- }
-
- @Override
- public void close() {
- }
-
- @Override
- public void put(Object key, Object value) {
- if (key == null) {
- records.put(System.nanoTime(), value);
- } else if (value != null) {
- records.put(key, value);
- } else {
- delete(key);
- }
- }
-
- @Override
- public CompletableFuture<Void> putAsync(Object key, Object value) {
- throw new NotImplementedException();
- }
-
- @Override
- public CompletableFuture<Void> putAllAsync(List list) {
- throw new NotImplementedException();
- }
-
- @Override
- public void delete(Object key) {
- records.remove(key);
- }
-
- @Override
- public CompletableFuture<Void> deleteAsync(Object key) {
- throw new NotImplementedException();
- }
-
- @Override
- public void deleteAll(List keys) {
- records.clear();
- }
-
- @Override
- public CompletableFuture<Void> deleteAllAsync(List keys) {
- throw new NotImplementedException();
- }
-
- @Override
- public void flush() {
- }
-
- @Override
- public void putAll(List entries) {
- throw new NotImplementedException();
- }
- }
-
- public static class TestTableProviderFactory implements TableProviderFactory {
- @Override
- public TableProvider getTableProvider(TableSpec tableSpec) {
- return new TestTableProvider();
- }
- }
-
- static class TestTableProvider extends BaseTableProvider {
-
- public TestTableProvider() {
- super(null);
- }
-
- @Override
- public Table getTable() {
- return new TestTable();
- }
-
- @Override
- public Map<String, String> generateConfig(Config jobConfig, Map<String, String> generatedConfig) {
- return new HashMap<>();
- }
-
- @Override
- public void close() {
- }
- }
-
- private class TestIOResolver implements SqlIOResolver {
- private final String SAMZA_SQL_QUERY_TABLE_KEYWORD = "$table";
- private final Config config;
- private final Map<String, TableDescriptor> tableDescMap = new HashMap<>();
- private final String changeLogStorePrefix;
-
- public TestIOResolver(Config config) {
- this.config = config;
- String metadataTopicPrefix = config.get(CFG_METADATA_TOPIC_PREFIX, DEFAULT_METADATA_TOPIC_PREFIX);
- this.changeLogStorePrefix = metadataTopicPrefix + (metadataTopicPrefix.isEmpty() ? "" : "_");
- }
-
- private SqlIOConfig fetchIOInfo(String ioName, boolean isSink) {
- String[] sourceComponents = ioName.split("\\.");
- int systemIdx = 0;
- int endIdx = sourceComponents.length - 1;
- int streamIdx = endIdx;
- TableDescriptor tableDescriptor = null;
-
- if (sourceComponents[endIdx].equalsIgnoreCase(SAMZA_SQL_QUERY_TABLE_KEYWORD)) {
- streamIdx = endIdx - 1;
-
- tableDescriptor = tableDescMap.get(ioName);
-
- if (tableDescriptor == null) {
- if (isSink) {
- tableDescriptor = new TestTableDescriptor(TEST_TABLE_ID + tableDescMap.size());
- } else {
- String tableId = changeLogStorePrefix + "InputTable-" + ioName.replace(".", "-").replace("$", "-");
- SamzaSqlRelRecordSerdeFactory.SamzaSqlRelRecordSerde keySerde =
- (SamzaSqlRelRecordSerdeFactory.SamzaSqlRelRecordSerde) new SamzaSqlRelRecordSerdeFactory().getSerde(null, null);
- SamzaSqlRelMessageSerdeFactory.SamzaSqlRelMessageSerde valueSerde =
- (SamzaSqlRelMessageSerdeFactory.SamzaSqlRelMessageSerde) new SamzaSqlRelMessageSerdeFactory().getSerde(null, null);
- tableDescriptor = new RocksDbTableDescriptor(tableId, KVSerde.of(keySerde, valueSerde)).withChangelogEnabled();
- }
- tableDescMap.put(ioName, tableDescriptor);
- }
- }
-
- Config systemConfigs = config.subset(sourceComponents[systemIdx] + ".");
- return new SqlIOConfig(sourceComponents[systemIdx], sourceComponents[streamIdx],
- Arrays.asList(sourceComponents), systemConfigs, tableDescriptor);
- }
-
- @Override
- public SqlIOConfig fetchSourceInfo(String sourceName) {
- return fetchIOInfo(sourceName, false);
- }
-
- @Override
- public SqlIOConfig fetchSinkInfo(String sinkName) {
- return fetchIOInfo(sinkName, true);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/12f421ce/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java
index 33c6d02..d496982 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java
@@ -20,7 +20,9 @@ package org.apache.samza.sql.translator;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import org.apache.calcite.adapter.enumerable.EnumerableTableScan;
import org.apache.calcite.plan.RelOptTable;
@@ -36,6 +38,9 @@ import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.samza.application.descriptors.StreamApplicationDescriptorImpl;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.MessageStreamImpl;
+import org.apache.samza.sql.data.SamzaSqlExecutionContext;
+import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
+import org.apache.samza.table.descriptors.RemoteTableDescriptor;
import org.apache.samza.table.descriptors.TableDescriptor;
import org.apache.samza.operators.functions.StreamTableJoinFunction;
import org.apache.samza.operators.spec.InputOperatorSpec;
@@ -48,7 +53,6 @@ import org.apache.samza.sql.data.Expression;
import org.apache.samza.sql.data.RexToJavaCompiler;
import org.apache.samza.sql.data.SamzaSqlRelMessage;
import org.apache.samza.sql.interfaces.SqlIOConfig;
-import org.apache.samza.sql.interfaces.SqlIOResolver;
import org.apache.samza.storage.kv.descriptors.RocksDbTableDescriptor;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -76,8 +80,17 @@ import static org.mockito.Mockito.when;
public class TestJoinTranslator extends TranslatorTestBase {
@Test
- public void testTranslateStreamToTableJoin() throws IOException, ClassNotFoundException {
- // setup mock values to the constructor of FilterTranslator
+ public void testTranslateStreamToLocalTableJoin() throws IOException, ClassNotFoundException {
+ testTranslateStreamToTableJoin(false);
+ }
+
+ @Test
+ public void testTranslateStreamToRemoteTableJoin() throws IOException, ClassNotFoundException {
+ testTranslateStreamToTableJoin(true);
+ }
+
+ private void testTranslateStreamToTableJoin(boolean isRemoteTable) throws IOException, ClassNotFoundException {
+ // setup mock values to the constructor of JoinTranslator
LogicalJoin mockJoin = PowerMockito.mock(LogicalJoin.class);
TranslatorContext mockContext = mock(TranslatorContext.class);
RelNode mockLeftInput = PowerMockito.mock(EnumerableTableScan.class);
@@ -145,9 +158,6 @@ public class TestJoinTranslator extends TranslatorTestBase {
OutputStreamImpl mockOutputStream = mock(OutputStreamImpl.class);
when(mockInputOp.isKeyed()).thenReturn(true);
when(mockOutputStream.isKeyed()).thenReturn(true);
- IntermediateMessageStreamImpl
- mockPartitionedStream = new IntermediateMessageStreamImpl(mockAppDesc, mockInputOp, mockOutputStream);
- when(mockAppDesc.getIntermediateStream(any(String.class), any(Serde.class), eq(false))).thenReturn(mockPartitionedStream);
doAnswer(this.getRegisterMessageStreamAnswer()).when(mockContext).registerMessageStream(eq(3), any(MessageStream.class));
RexToJavaCompiler mockCompiler = mock(RexToJavaCompiler.class);
@@ -155,16 +165,37 @@ public class TestJoinTranslator extends TranslatorTestBase {
Expression mockExpr = mock(Expression.class);
when(mockCompiler.compile(any(), any())).thenReturn(mockExpr);
- doAnswer(this.getRegisteredTableAnswer()).when(mockAppDesc).getTable(any(RocksDbTableDescriptor.class));
+ if (isRemoteTable) {
+ doAnswer(this.getRegisteredTableAnswer()).when(mockAppDesc).getTable(any(RemoteTableDescriptor.class));
+ } else {
+ IntermediateMessageStreamImpl
+ mockPartitionedStream = new IntermediateMessageStreamImpl(mockAppDesc, mockInputOp, mockOutputStream);
+ when(mockAppDesc.getIntermediateStream(any(String.class), any(Serde.class), eq(false))).thenReturn(mockPartitionedStream);
+ doAnswer(this.getRegisteredTableAnswer()).when(mockAppDesc).getTable(any(RocksDbTableDescriptor.class));
+ }
when(mockJoin.getJoinType()).thenReturn(JoinRelType.INNER);
- SqlIOResolver mockResolver = mock(SqlIOResolver.class);
+
+ SamzaSqlExecutionContext mockExecutionContext = mock(SamzaSqlExecutionContext.class);
+ when(mockContext.getExecutionContext()).thenReturn(mockExecutionContext);
+
+ SamzaSqlApplicationConfig mockAppConfig = mock(SamzaSqlApplicationConfig.class);
+ when(mockExecutionContext.getSamzaSqlApplicationConfig()).thenReturn(mockAppConfig);
+
+ Map<String, SqlIOConfig> ssConfigBySource = mock(HashMap.class);
+ when(mockAppConfig.getInputSystemStreamConfigBySource()).thenReturn(ssConfigBySource);
+
SqlIOConfig mockIOConfig = mock(SqlIOConfig.class);
- TableDescriptor mockTableDesc = mock(TableDescriptor.class);
- when(mockResolver.fetchSourceInfo(String.join(".", qualifiedTableName))).thenReturn(mockIOConfig);
+ TableDescriptor mockTableDesc;
+ if (isRemoteTable) {
+ mockTableDesc = mock(RemoteTableDescriptor.class);
+ } else {
+ mockTableDesc = mock(RocksDbTableDescriptor.class);
+ }
+ when(ssConfigBySource.get(String.join(".", qualifiedTableName))).thenReturn(mockIOConfig);
when(mockIOConfig.getTableDescriptor()).thenReturn(Optional.of(mockTableDesc));
// Apply translate() method to verify that we are getting the correct map operator constructed
- JoinTranslator joinTranslator = new JoinTranslator(3, mockResolver, "");
+ JoinTranslator joinTranslator = new JoinTranslator(3, "", 0);
joinTranslator.translate(mockJoin, mockContext);
// make sure that context has been registered with LogicFilter and output message streams
verify(mockContext, times(1)).registerMessageStream(3, this.getRegisteredMessageStream(3));
@@ -178,6 +209,11 @@ public class TestJoinTranslator extends TranslatorTestBase {
// Verify joinSpec has the corresponding setup
StreamTableJoinFunction joinFn = joinSpec.getJoinFn();
assertNotNull(joinFn);
+ if (isRemoteTable) {
+ assertTrue(joinFn instanceof SamzaSqlRemoteTableJoinFunction);
+ } else {
+ assertTrue(joinFn instanceof SamzaSqlLocalTableJoinFunction);
+ }
assertTrue(Whitebox.getInternalState(joinFn, "isTablePosOnRight").equals(false));
assertEquals(new ArrayList<Integer>() {{ this.add(0); }}, Whitebox.getInternalState(joinFn, "streamFieldIds"));
assertEquals(leftFieldNames, Whitebox.getInternalState(joinFn, "tableFieldNames"));
@@ -185,7 +221,5 @@ public class TestJoinTranslator extends TranslatorTestBase {
outputFieldNames.addAll(leftFieldNames);
outputFieldNames.addAll(rightStreamFieldNames);
assertEquals(outputFieldNames, Whitebox.getInternalState(joinFn, "outFieldNames"));
-
}
-
}
http://git-wip-us.apache.org/repos/asf/samza/blob/12f421ce/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
index c7c82da..69f40d2 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
@@ -71,7 +71,7 @@ public class TestQueryTranslator {
StreamApplicationDescriptorImpl appDesc = new StreamApplicationDescriptorImpl(streamApp -> { },samzaConfig);
QueryTranslator translator = new QueryTranslator(appDesc, samzaSqlApplicationConfig);
- translator.translate(queryInfo.get(0), appDesc);
+ translator.translate(queryInfo.get(0), appDesc, 0);
OperatorSpecGraph specGraph = appDesc.getOperatorSpecGraph();
StreamConfig streamConfig = new StreamConfig(samzaConfig);
@@ -108,7 +108,7 @@ public class TestQueryTranslator {
StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig);
QueryTranslator translator = new QueryTranslator(streamAppDesc, samzaSqlApplicationConfig);
- translator.translate(queryInfo.get(0), streamAppDesc);
+ translator.translate(queryInfo.get(0), streamAppDesc, 0);
OperatorSpecGraph specGraph = streamAppDesc.getOperatorSpecGraph();
StreamConfig streamConfig = new StreamConfig(samzaConfig);
@@ -145,7 +145,7 @@ public class TestQueryTranslator {
StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig);
QueryTranslator translator = new QueryTranslator(streamAppDesc, samzaSqlApplicationConfig);
- translator.translate(queryInfo.get(0), streamAppDesc);
+ translator.translate(queryInfo.get(0), streamAppDesc, 0);
OperatorSpecGraph specGraph = streamAppDesc.getOperatorSpecGraph();
StreamConfig streamConfig = new StreamConfig(samzaConfig);
@@ -185,7 +185,7 @@ public class TestQueryTranslator {
StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig);
QueryTranslator translator = new QueryTranslator(streamAppDesc, samzaSqlApplicationConfig);
- translator.translate(queryInfo.get(0), streamAppDesc);
+ translator.translate(queryInfo.get(0), streamAppDesc, 0);
}
@Test (expected = SamzaException.class)
@@ -210,7 +210,7 @@ public class TestQueryTranslator {
StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig);
QueryTranslator translator = new QueryTranslator(streamAppDesc, samzaSqlApplicationConfig);
- translator.translate(queryInfo.get(0), streamAppDesc);
+ translator.translate(queryInfo.get(0), streamAppDesc, 0);
}
@Test (expected = SamzaException.class)
@@ -235,7 +235,7 @@ public class TestQueryTranslator {
StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig);
QueryTranslator translator = new QueryTranslator(streamAppDesc, samzaSqlApplicationConfig);
- translator.translate(queryInfo.get(0), streamAppDesc);
+ translator.translate(queryInfo.get(0), streamAppDesc, 0);
}
@Test (expected = SamzaException.class)
@@ -259,7 +259,7 @@ public class TestQueryTranslator {
StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig);
QueryTranslator translator = new QueryTranslator(streamAppDesc, samzaSqlApplicationConfig);
- translator.translate(queryInfo.get(0), streamAppDesc);
+ translator.translate(queryInfo.get(0), streamAppDesc, 0);
}
@Test (expected = SamzaException.class)
@@ -281,7 +281,7 @@ public class TestQueryTranslator {
StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig);
QueryTranslator translator = new QueryTranslator(streamAppDesc, samzaSqlApplicationConfig);
- translator.translate(queryInfo.get(0), streamAppDesc);
+ translator.translate(queryInfo.get(0), streamAppDesc, 0);
}
@Test (expected = SamzaException.class)
@@ -305,7 +305,7 @@ public class TestQueryTranslator {
StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig);
QueryTranslator translator = new QueryTranslator(streamAppDesc, samzaSqlApplicationConfig);
- translator.translate(queryInfo.get(0), streamAppDesc);
+ translator.translate(queryInfo.get(0), streamAppDesc, 0);
}
@Test (expected = SamzaException.class)
@@ -330,7 +330,7 @@ public class TestQueryTranslator {
StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig);
QueryTranslator translator = new QueryTranslator(streamAppDesc, samzaSqlApplicationConfig);
- translator.translate(queryInfo.get(0), streamAppDesc);
+ translator.translate(queryInfo.get(0), streamAppDesc, 0);
}
@Test (expected = SamzaException.class)
@@ -354,7 +354,7 @@ public class TestQueryTranslator {
StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig);
QueryTranslator translator = new QueryTranslator(streamAppDesc, samzaSqlApplicationConfig);
- translator.translate(queryInfo.get(0), streamAppDesc);
+ translator.translate(queryInfo.get(0), streamAppDesc, 0);
}
@Test (expected = SamzaException.class)
@@ -378,7 +378,7 @@ public class TestQueryTranslator {
StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig);
QueryTranslator translator = new QueryTranslator(streamAppDesc, samzaSqlApplicationConfig);
- translator.translate(queryInfo.get(0), streamAppDesc);
+ translator.translate(queryInfo.get(0), streamAppDesc, 0);
}
@Test (expected = SamzaException.class)
@@ -402,7 +402,7 @@ public class TestQueryTranslator {
StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig);
QueryTranslator translator = new QueryTranslator(streamAppDesc, samzaSqlApplicationConfig);
- translator.translate(queryInfo.get(0), streamAppDesc);
+ translator.translate(queryInfo.get(0), streamAppDesc, 0);
}
@Test (expected = SamzaException.class)
@@ -426,7 +426,7 @@ public class TestQueryTranslator {
StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig);
QueryTranslator translator = new QueryTranslator(streamAppDesc, samzaSqlApplicationConfig);
- translator.translate(queryInfo.get(0), streamAppDesc);
+ translator.translate(queryInfo.get(0), streamAppDesc, 0);
}
@Test (expected = SamzaException.class)
@@ -454,7 +454,7 @@ public class TestQueryTranslator {
StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig);
QueryTranslator translator = new QueryTranslator(streamAppDesc, samzaSqlApplicationConfig);
- translator.translate(queryInfo.get(0), streamAppDesc);
+ translator.translate(queryInfo.get(0), streamAppDesc, 0);
}
@Test
@@ -480,7 +480,7 @@ public class TestQueryTranslator {
StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig);
QueryTranslator translator = new QueryTranslator(streamAppDesc, samzaSqlApplicationConfig);
- translator.translate(queryInfo.get(0), streamAppDesc);
+ translator.translate(queryInfo.get(0), streamAppDesc, 0);
OperatorSpecGraph specGraph = streamAppDesc.getOperatorSpecGraph();
StreamConfig streamConfig = new StreamConfig(samzaConfig);
@@ -547,7 +547,7 @@ public class TestQueryTranslator {
StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig);
QueryTranslator translator = new QueryTranslator(streamAppDesc, samzaSqlApplicationConfig);
- translator.translate(queryInfo.get(0), streamAppDesc);
+ translator.translate(queryInfo.get(0), streamAppDesc, 0);
OperatorSpecGraph specGraph = streamAppDesc.getOperatorSpecGraph();
@@ -614,7 +614,7 @@ public class TestQueryTranslator {
StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig);
QueryTranslator translator = new QueryTranslator(streamAppDesc, samzaSqlApplicationConfig);
- translator.translate(queryInfo.get(0), streamAppDesc);
+ translator.translate(queryInfo.get(0), streamAppDesc, 0);
OperatorSpecGraph specGraph = streamAppDesc.getOperatorSpecGraph();
@@ -682,7 +682,7 @@ public class TestQueryTranslator {
StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig);
QueryTranslator translator = new QueryTranslator(streamAppDesc, samzaSqlApplicationConfig);
- translator.translate(queryInfo.get(0), streamAppDesc);
+ translator.translate(queryInfo.get(0), streamAppDesc, 0);
OperatorSpecGraph specGraph = streamAppDesc.getOperatorSpecGraph();
Assert.assertEquals(1, specGraph.getInputOperators().size());
@@ -711,6 +711,6 @@ public class TestQueryTranslator {
StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig);
QueryTranslator translator = new QueryTranslator(streamAppDesc, samzaSqlApplicationConfig);
- translator.translate(queryInfo.get(0), streamAppDesc);
+ translator.translate(queryInfo.get(0), streamAppDesc, 0);
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/12f421ce/samza-sql/src/test/java/org/apache/samza/sql/translator/TestSamzaSqlLocalTableJoinFunction.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestSamzaSqlLocalTableJoinFunction.java b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestSamzaSqlLocalTableJoinFunction.java
new file mode 100644
index 0000000..319a7c0
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestSamzaSqlLocalTableJoinFunction.java
@@ -0,0 +1,161 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.translator;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.samza.operators.KV;
+import org.apache.samza.sql.SamzaSqlRelRecord;
+import org.apache.samza.sql.data.SamzaSqlRelMessage;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.powermock.api.mockito.PowerMockito.*;
+
+
+public class TestSamzaSqlLocalTableJoinFunction {
+
+ private List<String> streamFieldNames = Arrays.asList("field1", "field2", "field3", "field4");
+ private List<Object> streamFieldValues = Arrays.asList("value1", 1, null, "value4");
+ private List<String> tableFieldNames = Arrays.asList("field11", "field12", "field13", "field14");
+ private List<Object> tableFieldValues = Arrays.asList("value1", 1, null, "value5");
+
+ @Test
+ public void testWithInnerJoinWithTableOnRight() {
+ SamzaSqlRelMessage streamMsg = new SamzaSqlRelMessage(streamFieldNames, streamFieldValues);
+ SamzaSqlRelMessage tableMsg = new SamzaSqlRelMessage(tableFieldNames, tableFieldValues);
+ JoinRelType joinRelType = JoinRelType.INNER;
+ List<Integer> streamKeyIds = Arrays.asList(0, 1);
+ List<Integer> tableKeyIds = Arrays.asList(0, 1);
+ SamzaSqlRelRecord compositeKey = SamzaSqlRelMessage.createSamzaSqlCompositeKey(tableMsg, tableKeyIds);
+ KV<SamzaSqlRelRecord, SamzaSqlRelMessage> record = KV.of(compositeKey, tableMsg);
+
+ JoinInputNode mockTableInputNode = mock(JoinInputNode.class);
+ when(mockTableInputNode.getKeyIds()).thenReturn(tableKeyIds);
+ when(mockTableInputNode.isPosOnRight()).thenReturn(true);
+ when(mockTableInputNode.getFieldNames()).thenReturn(tableFieldNames);
+
+ JoinInputNode mockStreamInputNode = mock(JoinInputNode.class);
+ when(mockStreamInputNode.getKeyIds()).thenReturn(streamKeyIds);
+ when(mockStreamInputNode.isPosOnRight()).thenReturn(false);
+ when(mockStreamInputNode.getFieldNames()).thenReturn(streamFieldNames);
+
+ SamzaSqlLocalTableJoinFunction joinFn =
+ new SamzaSqlLocalTableJoinFunction(mockStreamInputNode, mockTableInputNode, joinRelType);
+ SamzaSqlRelMessage outMsg = joinFn.apply(streamMsg, record);
+
+ Assert.assertEquals(outMsg.getSamzaSqlRelRecord().getFieldValues().size(),
+ outMsg.getSamzaSqlRelRecord().getFieldNames().size());
+ List<String> expectedFieldNames = new ArrayList<>(streamFieldNames);
+ expectedFieldNames.addAll(tableFieldNames);
+ List<Object> expectedFieldValues = new ArrayList<>(streamFieldValues);
+ expectedFieldValues.addAll(tableFieldValues);
+ Assert.assertEquals(outMsg.getSamzaSqlRelRecord().getFieldValues(), expectedFieldValues);
+ }
+
+ @Test
+ public void testWithInnerJoinWithTableOnLeft() {
+ SamzaSqlRelMessage streamMsg = new SamzaSqlRelMessage(streamFieldNames, streamFieldValues);
+ SamzaSqlRelMessage tableMsg = new SamzaSqlRelMessage(tableFieldNames, tableFieldValues);
+ JoinRelType joinRelType = JoinRelType.INNER;
+ List<Integer> streamKeyIds = Arrays.asList(0, 2);
+ List<Integer> tableKeyIds = Arrays.asList(0, 2);
+ SamzaSqlRelRecord compositeKey = SamzaSqlRelMessage.createSamzaSqlCompositeKey(tableMsg, tableKeyIds);
+ KV<SamzaSqlRelRecord, SamzaSqlRelMessage> record = KV.of(compositeKey, tableMsg);
+
+ JoinInputNode mockTableInputNode = mock(JoinInputNode.class);
+ when(mockTableInputNode.getKeyIds()).thenReturn(tableKeyIds);
+ when(mockTableInputNode.isPosOnRight()).thenReturn(false);
+ when(mockTableInputNode.getFieldNames()).thenReturn(tableFieldNames);
+
+ JoinInputNode mockStreamInputNode = mock(JoinInputNode.class);
+ when(mockStreamInputNode.getKeyIds()).thenReturn(streamKeyIds);
+ when(mockStreamInputNode.isPosOnRight()).thenReturn(true);
+ when(mockStreamInputNode.getFieldNames()).thenReturn(streamFieldNames);
+
+ SamzaSqlLocalTableJoinFunction joinFn =
+ new SamzaSqlLocalTableJoinFunction(mockStreamInputNode, mockTableInputNode, joinRelType);
+ SamzaSqlRelMessage outMsg = joinFn.apply(streamMsg, record);
+
+ Assert.assertEquals(outMsg.getSamzaSqlRelRecord().getFieldValues().size(),
+ outMsg.getSamzaSqlRelRecord().getFieldNames().size());
+ List<String> expectedFieldNames = new ArrayList<>(tableFieldNames);
+ expectedFieldNames.addAll(streamFieldNames);
+ List<Object> expectedFieldValues = new ArrayList<>(tableFieldValues);
+ expectedFieldValues.addAll(streamFieldValues);
+ Assert.assertEquals(outMsg.getSamzaSqlRelRecord().getFieldValues(), expectedFieldValues);
+ }
+
+ @Test
+ public void testNullRecordWithInnerJoin() {
+ SamzaSqlRelMessage streamMsg = new SamzaSqlRelMessage(streamFieldNames, streamFieldValues);
+ JoinRelType joinRelType = JoinRelType.INNER;
+ List<Integer> streamKeyIds = Arrays.asList(0, 1);
+ List<Integer> tableKeyIds = Arrays.asList(2, 3);
+
+ JoinInputNode mockTableInputNode = mock(JoinInputNode.class);
+ when(mockTableInputNode.getKeyIds()).thenReturn(tableKeyIds);
+ when(mockTableInputNode.isPosOnRight()).thenReturn(true);
+ when(mockTableInputNode.getFieldNames()).thenReturn(tableFieldNames);
+
+ JoinInputNode mockStreamInputNode = mock(JoinInputNode.class);
+ when(mockStreamInputNode.getKeyIds()).thenReturn(streamKeyIds);
+ when(mockStreamInputNode.isPosOnRight()).thenReturn(false);
+ when(mockStreamInputNode.getFieldNames()).thenReturn(streamFieldNames);
+
+ SamzaSqlLocalTableJoinFunction joinFn =
+ new SamzaSqlLocalTableJoinFunction(mockStreamInputNode, mockTableInputNode, joinRelType);
+ SamzaSqlRelMessage outMsg = joinFn.apply(streamMsg, null);
+ Assert.assertNull(outMsg);
+ }
+
+ @Test
+ public void testNullRecordWithLeftOuterJoin() {
+ SamzaSqlRelMessage streamMsg = new SamzaSqlRelMessage(streamFieldNames, streamFieldValues);
+ JoinRelType joinRelType = JoinRelType.LEFT;
+ List<Integer> streamKeyIds = Arrays.asList(0, 1);
+ List<Integer> tableKeyIds = Arrays.asList(2, 3);
+
+ JoinInputNode mockTableInputNode = mock(JoinInputNode.class);
+ when(mockTableInputNode.getKeyIds()).thenReturn(tableKeyIds);
+ when(mockTableInputNode.isPosOnRight()).thenReturn(true);
+ when(mockTableInputNode.getFieldNames()).thenReturn(tableFieldNames);
+
+ JoinInputNode mockStreamInputNode = mock(JoinInputNode.class);
+ when(mockStreamInputNode.getKeyIds()).thenReturn(streamKeyIds);
+ when(mockStreamInputNode.isPosOnRight()).thenReturn(false);
+ when(mockStreamInputNode.getFieldNames()).thenReturn(streamFieldNames);
+
+ SamzaSqlLocalTableJoinFunction joinFn =
+ new SamzaSqlLocalTableJoinFunction(mockStreamInputNode, mockTableInputNode, joinRelType);
+ SamzaSqlRelMessage outMsg = joinFn.apply(streamMsg, null);
+
+ Assert.assertEquals(outMsg.getSamzaSqlRelRecord().getFieldValues().size(),
+ outMsg.getSamzaSqlRelRecord().getFieldNames().size());
+ List<String> expectedFieldNames = new ArrayList<>(streamFieldNames);
+ expectedFieldNames.addAll(tableFieldNames);
+ List<Object> expectedFieldValues = new ArrayList<>(streamFieldValues);
+ expectedFieldValues.addAll(tableFieldNames.stream().map( name -> null ).collect(Collectors.toList()));
+ Assert.assertEquals(outMsg.getSamzaSqlRelRecord().getFieldValues(), expectedFieldValues);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/12f421ce/samza-sql/src/test/java/org/apache/samza/sql/translator/TestSamzaSqlRelMessageJoinFunction.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestSamzaSqlRelMessageJoinFunction.java b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestSamzaSqlRelMessageJoinFunction.java
deleted file mode 100644
index 6362155..0000000
--- a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestSamzaSqlRelMessageJoinFunction.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
-
-package org.apache.samza.sql.translator;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.stream.Collectors;
-import org.apache.calcite.rel.core.JoinRelType;
-import org.apache.samza.operators.KV;
-import org.apache.samza.sql.SamzaSqlRelRecord;
-import org.apache.samza.sql.data.SamzaSqlRelMessage;
-import org.junit.Assert;
-import org.junit.Test;
-
-
-public class TestSamzaSqlRelMessageJoinFunction {
-
- private List<String> streamFieldNames = Arrays.asList("field1", "field2", "field3", "field4");
- private List<Object> streamFieldValues = Arrays.asList("value1", 1, null, "value4");
- private List<String> tableFieldNames = Arrays.asList("field11", "field12", "field13", "field14");
- private List<Object> tableFieldValues = Arrays.asList("value1", 1, null, "value5");
-
- @Test
- public void testWithInnerJoinWithTableOnRight() {
- SamzaSqlRelMessage streamMsg = new SamzaSqlRelMessage(streamFieldNames, streamFieldValues);
- SamzaSqlRelMessage tableMsg = new SamzaSqlRelMessage(tableFieldNames, tableFieldValues);
- JoinRelType joinRelType = JoinRelType.INNER;
- List<Integer> streamKeyIds = Arrays.asList(0, 1);
- List<Integer> tableKeyIds = Arrays.asList(0, 1);
- SamzaSqlRelRecord compositeKey = SamzaSqlRelMessage.createSamzaSqlCompositeKey(tableMsg, tableKeyIds);
- KV<SamzaSqlRelRecord, SamzaSqlRelMessage> record = KV.of(compositeKey, tableMsg);
-
- SamzaSqlRelMessageJoinFunction joinFn =
- new SamzaSqlRelMessageJoinFunction(joinRelType, true, streamKeyIds, streamFieldNames, tableKeyIds, tableFieldNames);
- SamzaSqlRelMessage outMsg = joinFn.apply(streamMsg, record);
-
- Assert.assertEquals(outMsg.getSamzaSqlRelRecord().getFieldValues().size(),
- outMsg.getSamzaSqlRelRecord().getFieldNames().size());
- List<String> expectedFieldNames = new ArrayList<>(streamFieldNames);
- expectedFieldNames.addAll(tableFieldNames);
- List<Object> expectedFieldValues = new ArrayList<>(streamFieldValues);
- expectedFieldValues.addAll(tableFieldValues);
- Assert.assertEquals(outMsg.getSamzaSqlRelRecord().getFieldValues(), expectedFieldValues);
- }
-
- @Test
- public void testWithInnerJoinWithTableOnLeft() {
- SamzaSqlRelMessage streamMsg = new SamzaSqlRelMessage(streamFieldNames, streamFieldValues);
- SamzaSqlRelMessage tableMsg = new SamzaSqlRelMessage(tableFieldNames, tableFieldValues);
- JoinRelType joinRelType = JoinRelType.INNER;
- List<Integer> streamKeyIds = Arrays.asList(0, 2);
- List<Integer> tableKeyIds = Arrays.asList(0, 2);
- SamzaSqlRelRecord compositeKey = SamzaSqlRelMessage.createSamzaSqlCompositeKey(tableMsg, tableKeyIds);
- KV<SamzaSqlRelRecord, SamzaSqlRelMessage> record = KV.of(compositeKey, tableMsg);
-
- SamzaSqlRelMessageJoinFunction joinFn =
- new SamzaSqlRelMessageJoinFunction(joinRelType, false, streamKeyIds, streamFieldNames,
- tableKeyIds, tableFieldNames);
- SamzaSqlRelMessage outMsg = joinFn.apply(streamMsg, record);
-
- Assert.assertEquals(outMsg.getSamzaSqlRelRecord().getFieldValues().size(),
- outMsg.getSamzaSqlRelRecord().getFieldNames().size());
- List<String> expectedFieldNames = new ArrayList<>(tableFieldNames);
- expectedFieldNames.addAll(streamFieldNames);
- List<Object> expectedFieldValues = new ArrayList<>(tableFieldValues);
- expectedFieldValues.addAll(streamFieldValues);
- Assert.assertEquals(outMsg.getSamzaSqlRelRecord().getFieldValues(), expectedFieldValues);
- }
-
- @Test
- public void testNullRecordWithInnerJoin() {
- SamzaSqlRelMessage streamMsg = new SamzaSqlRelMessage(streamFieldNames, streamFieldValues);
- JoinRelType joinRelType = JoinRelType.INNER;
- List<Integer> streamKeyIds = Arrays.asList(0, 1);
- List<Integer> tableKeyIds = Arrays.asList(2, 3);
-
- SamzaSqlRelMessageJoinFunction joinFn =
- new SamzaSqlRelMessageJoinFunction(joinRelType, true, streamKeyIds, streamFieldNames, tableKeyIds, tableFieldNames);
- SamzaSqlRelMessage outMsg = joinFn.apply(streamMsg, null);
- Assert.assertNull(outMsg);
- }
-
- @Test
- public void testNullRecordWithLeftOuterJoin() {
- SamzaSqlRelMessage streamMsg = new SamzaSqlRelMessage(streamFieldNames, streamFieldValues);
- JoinRelType joinRelType = JoinRelType.LEFT;
- List<Integer> streamKeyIds = Arrays.asList(0, 1);
- List<Integer> tableKeyIds = Arrays.asList(2, 3);
-
- SamzaSqlRelMessageJoinFunction joinFn =
- new SamzaSqlRelMessageJoinFunction(joinRelType, true, streamKeyIds, streamFieldNames,
- tableKeyIds, tableFieldNames);
- SamzaSqlRelMessage outMsg = joinFn.apply(streamMsg, null);
-
- Assert.assertEquals(outMsg.getSamzaSqlRelRecord().getFieldValues().size(),
- outMsg.getSamzaSqlRelRecord().getFieldNames().size());
- List<String> expectedFieldNames = new ArrayList<>(streamFieldNames);
- expectedFieldNames.addAll(tableFieldNames);
- List<Object> expectedFieldValues = new ArrayList<>(streamFieldValues);
- expectedFieldValues.addAll(tableFieldNames.stream().map( name -> null ).collect(Collectors.toList()));
- Assert.assertEquals(outMsg.getSamzaSqlRelRecord().getFieldValues(), expectedFieldValues);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/12f421ce/samza-sql/src/test/java/org/apache/samza/sql/translator/TestSamzaSqlRemoteTableJoinFunction.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestSamzaSqlRemoteTableJoinFunction.java b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestSamzaSqlRemoteTableJoinFunction.java
new file mode 100644
index 0000000..63419a8
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestSamzaSqlRemoteTableJoinFunction.java
@@ -0,0 +1,103 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.translator;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.operators.KV;
+import org.apache.samza.sql.avro.AvroRelConverter;
+import org.apache.samza.sql.avro.AvroRelSchemaProvider;
+import org.apache.samza.sql.avro.ConfigBasedAvroRelSchemaProviderFactory;
+import org.apache.samza.sql.avro.schemas.SimpleRecord;
+import org.apache.samza.sql.data.SamzaSqlRelMessage;
+import org.apache.samza.sql.interfaces.SamzaRelTableKeyConverter;
+import org.apache.samza.sql.testutil.SampleRelTableKeyConverter;
+import org.apache.samza.system.SystemStream;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.powermock.api.mockito.PowerMockito.*;
+
+
+public class TestSamzaSqlRemoteTableJoinFunction {
+
+ private List<String> streamFieldNames = Arrays.asList("field1", "field2", "field3", "field4");
+ private List<Object> streamFieldValues = Arrays.asList("value1", 1, null, "value4");
+
+ @Test
+ public void testWithInnerJoinWithTableOnRight() {
+ Map<String, String> props = new HashMap<>();
+ SystemStream ss = new SystemStream("test", "nestedRecord");
+ props.put(
+ String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA, ss.getSystem(), ss.getStream()),
+ SimpleRecord.SCHEMA$.toString());
+ ConfigBasedAvroRelSchemaProviderFactory factory = new ConfigBasedAvroRelSchemaProviderFactory();
+ AvroRelSchemaProvider schemaProvider =
+ (AvroRelSchemaProvider) factory.create(ss, new MapConfig(props));
+ AvroRelConverter relConverter =
+ new AvroRelConverter(ss, schemaProvider, new MapConfig());
+ SamzaRelTableKeyConverter relTableKeyConverter = new SampleRelTableKeyConverter();
+ String remoteTableName = "testDb.testTable.$table";
+
+ GenericData.Record tableRecord = new GenericData.Record(SimpleRecord.SCHEMA$);
+ tableRecord.put("id", 1);
+ tableRecord.put("name", "name1");
+
+ SamzaSqlRelMessage streamMsg = new SamzaSqlRelMessage(streamFieldNames, streamFieldValues);
+ SamzaSqlRelMessage tableMsg = relConverter.convertToRelMessage(new KV(tableRecord.get("id"), tableRecord));
+ JoinRelType joinRelType = JoinRelType.INNER;
+ List<Integer> streamKeyIds = Arrays.asList(1);
+ List<Integer> tableKeyIds = Arrays.asList(0);
+ KV<Object, GenericRecord> record = KV.of(tableRecord.get("id"), tableRecord);
+
+ JoinInputNode mockTableInputNode = mock(JoinInputNode.class);
+ when(mockTableInputNode.getKeyIds()).thenReturn(tableKeyIds);
+ when(mockTableInputNode.isPosOnRight()).thenReturn(true);
+ when(mockTableInputNode.getFieldNames()).thenReturn(tableMsg.getSamzaSqlRelRecord().getFieldNames());
+ when(mockTableInputNode.getSourceName()).thenReturn(remoteTableName);
+
+ JoinInputNode mockStreamInputNode = mock(JoinInputNode.class);
+ when(mockStreamInputNode.getKeyIds()).thenReturn(streamKeyIds);
+ when(mockStreamInputNode.isPosOnRight()).thenReturn(false);
+ when(mockStreamInputNode.getFieldNames()).thenReturn(streamFieldNames);
+
+ SamzaSqlRemoteTableJoinFunction joinFn =
+ new SamzaSqlRemoteTableJoinFunction(relConverter, relTableKeyConverter, mockStreamInputNode, mockTableInputNode,
+ joinRelType, 0);
+ SamzaSqlRelMessage outMsg = joinFn.apply(streamMsg, record);
+
+ Assert.assertEquals(outMsg.getSamzaSqlRelRecord().getFieldValues().size(),
+ outMsg.getSamzaSqlRelRecord().getFieldNames().size());
+ List<String> expectedFieldNames = new ArrayList<>(streamFieldNames);
+ expectedFieldNames.addAll(tableMsg.getSamzaSqlRelRecord().getFieldNames());
+ List<Object> expectedFieldValues = new ArrayList<>(streamFieldValues);
+ expectedFieldValues.addAll(tableMsg.getSamzaSqlRelRecord().getFieldValues());
+
+ Assert.assertEquals(expectedFieldNames, outMsg.getSamzaSqlRelRecord().getFieldNames());
+ Assert.assertEquals(expectedFieldValues, outMsg.getSamzaSqlRelRecord().getFieldValues());
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/12f421ce/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
index df4020c..77538ff 100644
--- a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
+++ b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
@@ -297,6 +297,36 @@ public class TestSamzaSqlEndToEnd extends AbstractIntegrationTestHarness {
}
@Test
+ public void testEndToEndStreamTableInnerJoinWithPrimaryKey() throws Exception {
+ int numMessages = 20;
+
+ TestAvroSystemFactory.messages.clear();
+ Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
+ staticConfigs.putAll(configs);
+ String sql =
+ "Insert into testavro.enrichedPageViewTopic "
+ + "select pv.pageKey as __key__, pv.pageKey as pageKey, coalesce(null, 'N/A') as companyName,"
+ + " p.name as profileName, p.address as profileAddress "
+ + "from testavro.PROFILE.`$table` as p "
+ + "join testavro.PAGEVIEW as pv "
+ + " on p.__key__ = pv.profileId";
+
+ List<String> sqlStmts = Arrays.asList(sql);
+ staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
+ SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs));
+ runner.runAndWaitForFinish();
+
+ List<String> outMessages = TestAvroSystemFactory.messages.stream()
+ .map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + ","
+ + (((GenericRecord) x.getMessage()).get("profileName") == null ? "null" :
+ ((GenericRecord) x.getMessage()).get("profileName").toString()))
+ .collect(Collectors.toList());
+ Assert.assertEquals(numMessages, outMessages.size());
+ List<String> expectedOutMessages = TestAvroSystemFactory.getPageKeyProfileNameJoin(numMessages);
+ Assert.assertEquals(expectedOutMessages, outMessages);
+ }
+
+ @Test
public void testEndToEndStreamTableInnerJoinWithUdf() throws Exception {
int numMessages = 20;
@@ -518,6 +548,37 @@ public class TestSamzaSqlEndToEnd extends AbstractIntegrationTestHarness {
}
@Test
+ public void testEndToEndStreamTableTableJoinWithPrimaryKeys() throws Exception {
+ int numMessages = 20;
+
+ TestAvroSystemFactory.messages.clear();
+ Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
+ String sql =
+ "Insert into testavro.enrichedPageViewTopic "
+ + "select pv.pageKey as __key__, pv.pageKey as pageKey, c.name as companyName, p.name as profileName,"
+ + " p.address as profileAddress "
+ + "from testavro.PAGEVIEW as pv "
+ + "join testavro.PROFILE.`$table` as p "
+ + " on MyTest(p.__key__) = MyTest(pv.profileId) "
+ + " join testavro.COMPANY.`$table` as c "
+ + " on MyTest(p.companyId) = MyTest(c.__key__)";
+
+ List<String> sqlStmts = Arrays.asList(sql);
+ staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
+ SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs));
+ runner.runAndWaitForFinish();
+
+ List<String> outMessages = TestAvroSystemFactory.messages.stream()
+ .map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + ","
+ + ((GenericRecord) x.getMessage()).get("profileName").toString() + ","
+ + ((GenericRecord) x.getMessage()).get("companyName").toString())
+ .collect(Collectors.toList());
+ Assert.assertEquals(numMessages, outMessages.size());
+ List<String> expectedOutMessages = TestAvroSystemFactory.getPageKeyProfileCompanyNameJoin(numMessages);
+ Assert.assertEquals(expectedOutMessages, outMessages);
+ }
+
+ @Test
public void testEndToEndStreamTableTableJoinWithCompositeKey() throws Exception {
int numMessages = 20;
[2/2] samza git commit: SAMZA-1616: Samza-Sql - Support remote table
for stream-table join
Posted by sr...@apache.org.
SAMZA-1616: Samza-Sql - Support remote table for stream-table join
Author: Aditya Toomula <at...@linkedin.com>
Reviewers: Srinivasulu Punuru <sp...@linkedin.com>
Closes #794 from atoomula/remote
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/12f421ce
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/12f421ce
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/12f421ce
Branch: refs/heads/master
Commit: 12f421ce212949ff3152dc9e0f1e4f38233dc5b8
Parents: 3da75e6
Author: Aditya Toomula <at...@linkedin.com>
Authored: Fri Nov 9 08:30:59 2018 -0800
Committer: Srinivasulu Punuru <sp...@linkedin.com>
Committed: Fri Nov 9 08:30:59 2018 -0800
----------------------------------------------------------------------
.../apache/samza/sql/avro/AvroRelConverter.java | 30 +--
.../interfaces/SamzaRelTableKeyConverter.java | 39 ++++
.../SamzaRelTableKeyConverterFactory.java | 39 ++++
.../samza/sql/interfaces/SqlIOConfig.java | 19 ++
.../samza/sql/runner/SamzaSqlApplication.java | 15 +-
.../sql/runner/SamzaSqlApplicationConfig.java | 19 +-
.../samza/sql/translator/JoinInputNode.java | 76 ++++++
.../samza/sql/translator/JoinTranslator.java | 161 ++++++++-----
.../samza/sql/translator/QueryTranslator.java | 9 +-
.../SamzaSqlLocalTableJoinFunction.java | 54 +++++
.../SamzaSqlRelMessageJoinFunction.java | 126 ----------
.../SamzaSqlRemoteTableJoinFunction.java | 79 +++++++
.../translator/SamzaSqlTableJoinFunction.java | 121 ++++++++++
.../samza/sql/translator/ScanTranslator.java | 12 +
.../samza/sql/translator/TranslatorContext.java | 13 +-
.../samza/sql/e2e/TestSamzaSqlRemoteTable.java | 115 +++++++++
.../apache/samza/sql/e2e/TestSamzaSqlTable.java | 68 ------
.../samza/sql/system/TestAvroSystemFactory.java | 27 ++-
.../RemoteStoreIOResolverTestFactory.java | 147 ++++++++++++
.../testutil/SampleRelTableKeyConverter.java | 36 +++
.../SampleRelTableKeyConverterFactory.java | 41 ++++
.../samza/sql/testutil/SamzaSqlTestConfig.java | 31 ++-
.../sql/testutil/TestIOResolverFactory.java | 234 -------------------
.../sql/translator/TestJoinTranslator.java | 60 +++--
.../sql/translator/TestQueryTranslator.java | 40 ++--
.../TestSamzaSqlLocalTableJoinFunction.java | 161 +++++++++++++
.../TestSamzaSqlRelMessageJoinFunction.java | 121 ----------
.../TestSamzaSqlRemoteTableJoinFunction.java | 103 ++++++++
.../test/samzasql/TestSamzaSqlEndToEnd.java | 61 +++++
29 files changed, 1356 insertions(+), 701 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/12f421ce/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
index 89026ee..e1e1660 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
@@ -145,7 +145,7 @@ public class AvroRelConverter implements SamzaRelConverter {
return new KV<>(relMessage.getKey(), convertToGenericRecord(relMessage.getSamzaSqlRelRecord(), payloadSchema));
}
- private GenericRecord convertToGenericRecord(SamzaSqlRelRecord relRecord, Schema schema) {
+ static public GenericRecord convertToGenericRecord(SamzaSqlRelRecord relRecord, Schema schema) {
GenericRecord record = new GenericData.Record(schema);
List<String> fieldNames = relRecord.getFieldNames();
List<Object> values = relRecord.getFieldValues();
@@ -160,7 +160,7 @@ public class AvroRelConverter implements SamzaRelConverter {
return record;
}
- public Object convertToAvroObject(Object relObj, Schema schema) {
+ static public Object convertToAvroObject(Object relObj, Schema schema) {
if (relObj == null) {
return null;
}
@@ -190,6 +190,19 @@ public class AvroRelConverter implements SamzaRelConverter {
}
}
+ // Two non-nullable types in a union is not yet supported.
+ static public Schema getNonNullUnionSchema(Schema schema) {
+ if (schema.getType().equals(Schema.Type.UNION)) {
+ if (schema.getTypes().get(0).getType() != Schema.Type.NULL) {
+ return schema.getTypes().get(0);
+ }
+ if (schema.getTypes().get(1).getType() != Schema.Type.NULL) {
+ return schema.getTypes().get(1);
+ }
+ }
+ return schema;
+ }
+
// Not doing any validations of data types with Avro schema considering the resource cost per message.
// Casting would fail if the data types are not in sync with the schema.
public Object convertToJavaObject(Object avroObj, Schema schema) {
@@ -238,17 +251,4 @@ public class AvroRelConverter implements SamzaRelConverter {
return avroObj;
}
}
-
- // Two non-nullable types in a union is not yet supported.
- public Schema getNonNullUnionSchema(Schema schema) {
- if (schema.getType().equals(Schema.Type.UNION)) {
- if (schema.getTypes().get(0).getType() != Schema.Type.NULL) {
- return schema.getTypes().get(0);
- }
- if (schema.getTypes().get(1).getType() != Schema.Type.NULL) {
- return schema.getTypes().get(1);
- }
- }
- return schema;
- }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/12f421ce/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SamzaRelTableKeyConverter.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SamzaRelTableKeyConverter.java b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SamzaRelTableKeyConverter.java
new file mode 100644
index 0000000..8af34f7
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SamzaRelTableKeyConverter.java
@@ -0,0 +1,39 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.interfaces;
+
+import org.apache.samza.sql.SamzaSqlRelRecord;
+
+
+/**
+ * {@link org.apache.samza.sql.translator.SamzaSqlRemoteTableJoinFunction} uses {@link SamzaRelTableKeyConverter}
+ * to convert the key to the format expected by the remote table system before doing the table lookup.
+ *
+ * The {@link SamzaRelTableKeyConverter} is configurable at a system level, so it is possible to configure different
+ * {@link SamzaRelTableKeyConverter} for different remote table systems.
+ */
+public interface SamzaRelTableKeyConverter {
+ /**
+ * Convert the key in relational record format to the format expected by remote table.
+ * @param relKeyRecord key relational record that needs to be converted.
+ * @return the table key
+ */
+ Object convertToTableKeyFormat(SamzaSqlRelRecord relKeyRecord);
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/12f421ce/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SamzaRelTableKeyConverterFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SamzaRelTableKeyConverterFactory.java b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SamzaRelTableKeyConverterFactory.java
new file mode 100644
index 0000000..aab8410
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SamzaRelTableKeyConverterFactory.java
@@ -0,0 +1,39 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.interfaces;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.system.SystemStream;
+
+
+/**
+ * Factory that is used to create {@link SamzaRelTableKeyConverter}
+ */
+public interface SamzaRelTableKeyConverterFactory {
+
+ /**
+ * Create a {@link SamzaRelTableKeyConverter}. This method is called when the framework wants to create the
+ * {@link SamzaRelTableKeyConverter} corresponding to the system.
+ * @param systemStream the systemStream to create a key converter for
+ * @param config config that is used to create the object
+ * @return the object created.
+ */
+ SamzaRelTableKeyConverter create(SystemStream systemStream, Config config);
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/12f421ce/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlIOConfig.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlIOConfig.java b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlIOConfig.java
index 8636736..3ef1795 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlIOConfig.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlIOConfig.java
@@ -29,6 +29,7 @@ import org.apache.commons.lang.Validate;
import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
import org.apache.samza.config.StreamConfig;
+import org.apache.samza.table.descriptors.RemoteTableDescriptor;
import org.apache.samza.table.descriptors.TableDescriptor;
import org.apache.samza.system.SystemStream;
@@ -39,6 +40,7 @@ import org.apache.samza.system.SystemStream;
public class SqlIOConfig {
public static final String CFG_SAMZA_REL_CONVERTER = "samzaRelConverterName";
+ public static final String CFG_SAMZA_REL_TABLE_KEY_CONVERTER = "samzaRelTableKeyConverterName";
public static final String CFG_REL_SCHEMA_PROVIDER = "relSchemaProviderName";
private final String systemName;
@@ -46,6 +48,7 @@ public class SqlIOConfig {
private final String streamName;
private final String samzaRelConverterName;
+ private final String samzaRelTableKeyConverterName;
private final SystemStream systemStream;
private final String source;
@@ -79,6 +82,14 @@ public class SqlIOConfig {
Validate.notEmpty(samzaRelConverterName,
String.format("%s is not set or empty for system %s", CFG_SAMZA_REL_CONVERTER, systemName));
+ if (isRemoteTable()) {
+ samzaRelTableKeyConverterName = streamConfigs.get(CFG_SAMZA_REL_TABLE_KEY_CONVERTER);
+ Validate.notEmpty(samzaRelTableKeyConverterName,
+ String.format("%s is not set or empty for system %s", CFG_SAMZA_REL_CONVERTER, systemName));
+ } else {
+ samzaRelTableKeyConverterName = "";
+ }
+
relSchemaProviderName = streamConfigs.get(CFG_REL_SCHEMA_PROVIDER);
// Removing the Samza SQL specific configs to get the remaining Samza configs.
@@ -114,6 +125,10 @@ public class SqlIOConfig {
return samzaRelConverterName;
}
+ public String getSamzaRelTableKeyConverterName() {
+ return samzaRelTableKeyConverterName;
+ }
+
public String getRelSchemaProviderName() {
return relSchemaProviderName;
}
@@ -133,4 +148,8 @@ public class SqlIOConfig {
public Optional<TableDescriptor> getTableDescriptor() {
return tableDescriptor;
}
+
+ public boolean isRemoteTable() {
+ return tableDescriptor.isPresent() && tableDescriptor.get() instanceof RemoteTableDescriptor;
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/12f421ce/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplication.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplication.java b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplication.java
index 47d6cf0..b8bb190 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplication.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplication.java
@@ -25,13 +25,11 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
import org.apache.calcite.rel.RelRoot;
import org.apache.samza.application.StreamApplication;
import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
import org.apache.samza.sql.data.SamzaSqlExecutionContext;
import org.apache.samza.sql.dsl.SamzaSqlDslConverter;
-import org.apache.samza.sql.interfaces.SamzaRelConverter;
import org.apache.samza.sql.translator.QueryTranslator;
import org.apache.samza.sql.translator.TranslatorContext;
import org.slf4j.Logger;
@@ -44,7 +42,6 @@ import org.slf4j.LoggerFactory;
public class SamzaSqlApplication implements StreamApplication {
private static final Logger LOG = LoggerFactory.getLogger(SamzaSqlApplication.class);
- private AtomicInteger queryId = new AtomicInteger(0);
@Override
public void describe(StreamApplicationDescriptor appDescriptor) {
@@ -69,13 +66,13 @@ public class SamzaSqlApplication implements StreamApplication {
// 3. Translate Calcite plan to Samza stream operators
QueryTranslator queryTranslator = new QueryTranslator(appDescriptor, sqlConfig);
SamzaSqlExecutionContext executionContext = new SamzaSqlExecutionContext(sqlConfig);
- Map<String, SamzaRelConverter> converters = sqlConfig.getSamzaRelConverters();
+ int queryId = 0;
for (RelRoot relRoot : relRoots) {
- LOG.info("Translating relRoot {} to samza stream graph", relRoot);
- int qId = queryId.incrementAndGet();
- TranslatorContext translatorContext = new TranslatorContext(appDescriptor, relRoot, executionContext, converters);
- translatorContextMap.put(qId, translatorContext);
- queryTranslator.translate(relRoot, translatorContext, qId);
+ LOG.info("Translating relRoot {} to samza stream graph with queryId {}", relRoot, queryId);
+ TranslatorContext translatorContext = new TranslatorContext(appDescriptor, relRoot, executionContext);
+ translatorContextMap.put(queryId, translatorContext);
+ queryTranslator.translate(relRoot, translatorContext, queryId);
+ queryId++;
}
// 4. Set all translator contexts
http://git-wip-us.apache.org/repos/asf/samza/blob/12f421ce/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
index dcb5043..6e12c02 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
@@ -47,6 +47,8 @@ import org.apache.samza.sql.interfaces.RelSchemaProvider;
import org.apache.samza.sql.interfaces.RelSchemaProviderFactory;
import org.apache.samza.sql.interfaces.SamzaRelConverter;
import org.apache.samza.sql.interfaces.SamzaRelConverterFactory;
+import org.apache.samza.sql.interfaces.SamzaRelTableKeyConverter;
+import org.apache.samza.sql.interfaces.SamzaRelTableKeyConverterFactory;
import org.apache.samza.sql.interfaces.SqlIOResolver;
import org.apache.samza.sql.interfaces.SqlIOResolverFactory;
import org.apache.samza.sql.interfaces.SqlIOConfig;
@@ -76,6 +78,7 @@ public class SamzaSqlApplicationConfig {
public static final String CFG_FMT_REL_SCHEMA_PROVIDER_DOMAIN = "samza.sql.relSchemaProvider.%s.";
public static final String CFG_FMT_SAMZA_REL_CONVERTER_DOMAIN = "samza.sql.relConverter.%s.";
+ public static final String CFG_FMT_SAMZA_REL_TABLE_KEY_CONVERTER_DOMAIN = "samza.sql.relTableKeyConverter.%s.";
public static final String CFG_IO_RESOLVER = "samza.sql.ioResolver";
public static final String CFG_FMT_SOURCE_RESOLVER_DOMAIN = "samza.sql.ioResolver.%s.";
@@ -94,6 +97,7 @@ public class SamzaSqlApplicationConfig {
private final Map<String, RelSchemaProvider> relSchemaProvidersBySource;
private final Map<String, SamzaRelConverter> samzaRelConvertersBySource;
+ private final Map<String, SamzaRelTableKeyConverter> samzaRelTableKeyConvertersBySource;
private SqlIOResolver ioResolver;
private UdfResolver udfResolver;
@@ -135,6 +139,13 @@ public class SamzaSqlApplicationConfig {
CFG_FMT_SAMZA_REL_CONVERTER_DOMAIN, (o, c) -> ((SamzaRelConverterFactory) o).create(x.getSystemStream(),
relSchemaProvidersBySource.get(x.getSource()), c))));
+ samzaRelTableKeyConvertersBySource = systemStreamConfigs.stream()
+ .filter(config -> config.isRemoteTable())
+ .collect(Collectors.toMap(SqlIOConfig::getSource,
+ x -> initializePlugin("SamzaRelTableKeyConverter", x.getSamzaRelTableKeyConverterName(),
+ staticConfig, CFG_FMT_SAMZA_REL_TABLE_KEY_CONVERTER_DOMAIN,
+ (o, c) -> ((SamzaRelTableKeyConverterFactory) o).create(x.getSystemStream(), c))));
+
udfResolver = createUdfResolver(staticConfig);
udfMetadata = udfResolver.getUdfs();
@@ -287,12 +298,12 @@ public class SamzaSqlApplicationConfig {
return samzaRelConvertersBySource;
}
- public Map<String, RelSchemaProvider> getRelSchemaProviders() {
- return relSchemaProvidersBySource;
+ public Map<String, SamzaRelTableKeyConverter> getSamzaRelTableKeyConverters() {
+ return samzaRelTableKeyConvertersBySource;
}
- public SqlIOResolver getIoResolver() {
- return ioResolver;
+ public Map<String, RelSchemaProvider> getRelSchemaProviders() {
+ return relSchemaProvidersBySource;
}
public String getMetadataTopicPrefix() {
http://git-wip-us.apache.org/repos/asf/samza/blob/12f421ce/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinInputNode.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinInputNode.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinInputNode.java
new file mode 100644
index 0000000..d952194
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinInputNode.java
@@ -0,0 +1,76 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.translator;
+
+import java.util.List;
+import org.apache.calcite.rel.RelNode;
+import org.apache.samza.sql.interfaces.SqlIOConfig;
+
+
+/**
+ * This class represents the input node for the join. It can be either a table or a stream.
+ */
+class JoinInputNode {
+
+ // Calcite RelNode corresponding to the input
+ private final RelNode relNode;
+ // Id of the key(s) in the fields that this input has
+ private final List<Integer> keyIds;
+
+ private final InputType inputType;
+ private final boolean isPosOnRight;
+
+ enum InputType {
+ STREAM,
+ LOCAL_TABLE,
+ REMOTE_TABLE
+ }
+
+ JoinInputNode(RelNode relNode, List<Integer> keyIds, InputType inputType, boolean isPosOnRight) {
+ this.relNode = relNode;
+ this.keyIds = keyIds;
+ this.inputType = inputType;
+ this.isPosOnRight = isPosOnRight;
+ }
+
+ boolean isRemoteTable() {
+ return this.inputType == InputType.REMOTE_TABLE;
+ }
+
+ List<Integer> getKeyIds() {
+ return keyIds;
+ }
+
+ List<String> getFieldNames() {
+ return relNode.getRowType().getFieldNames();
+ }
+
+ RelNode getRelNode() {
+ return relNode;
+ }
+
+ String getSourceName() {
+ return SqlIOConfig.getSourceFromSourceParts(relNode.getTable().getQualifiedName());
+ }
+
+ boolean isPosOnRight() {
+ return isPosOnRight;
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/12f421ce/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
index 5f44ff9..113244f 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
@@ -40,14 +40,15 @@ import org.apache.commons.lang.Validate;
import org.apache.samza.SamzaException;
import org.apache.samza.operators.KV;
import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.functions.StreamTableJoinFunction;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.sql.SamzaSqlRelRecord;
import org.apache.samza.sql.data.SamzaSqlRelMessage;
-import org.apache.samza.sql.interfaces.SqlIOResolver;
import org.apache.samza.sql.interfaces.SqlIOConfig;
import org.apache.samza.sql.serializers.SamzaSqlRelMessageSerdeFactory;
import org.apache.samza.sql.serializers.SamzaSqlRelRecordSerdeFactory;
import org.apache.samza.table.Table;
+import org.apache.samza.table.descriptors.RemoteTableDescriptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -59,38 +60,40 @@ import static org.apache.samza.sql.data.SamzaSqlRelMessage.createSamzaSqlComposi
* Translator to translate the LogicalJoin node in the relational graph to the corresponding StreamGraph
* implementation.
* Join is supported with the following caveats:
- * 0. Only local tables are supported. Remote/composite tables are not yet supported.
* 1. Only stream-table joins are supported. No stream-stream joins.
* 2. Only Equi-joins are supported. No theta-joins.
* 3. Inner joins, Left and Right outer joins are supported. No cross joins, full outer joins or natural joins.
* 4. Join condition with a constant is not supported.
* 5. Compound join condition with only AND operator is supported. AND operator with a constant is not supported. No
* support for OR operator or any other operator in the join condition.
- *
- * It is assumed that the stream denoted as 'table' is already partitioned by the key(s) specified in the join
- * condition. We do not repartition the table as bootstrap semantic is not propagated to the intermediate streams.
- * Please refer SAMZA-1613 for more details on this. But we always repartition the stream by the key(s) specified in
- * the join condition.
+ * For local table, we always repartition both the stream to be joined and the stream denoted as table by the key(s)
+ * specified in the join condition.
*/
class JoinTranslator {
private static final Logger log = LoggerFactory.getLogger(JoinTranslator.class);
private int joinId;
- private SqlIOResolver ioResolver;
private final String intermediateStreamPrefix;
+ private final int queryId;
- JoinTranslator(int joinId, SqlIOResolver ioResolver, String intermediateStreamPrefix) {
+ JoinTranslator(int joinId, String intermediateStreamPrefix, int queryId) {
this.joinId = joinId;
- this.ioResolver = ioResolver;
this.intermediateStreamPrefix = intermediateStreamPrefix + (intermediateStreamPrefix.isEmpty() ? "" : "_");
+ this.queryId = queryId;
}
void translate(final LogicalJoin join, final TranslatorContext context) {
+ JoinInputNode.InputType inputTypeOnLeft = getInputType(join.getLeft(), context);
+ JoinInputNode.InputType inputTypeOnRight = getInputType(join.getRight(), context);
// Do the validation of join query
- validateJoinQuery(join);
+ validateJoinQuery(join, inputTypeOnLeft, inputTypeOnRight);
- boolean isTablePosOnRight = isTable(join.getRight());
+ // At this point, one of the sides is a table. Let's figure out if it is on left or right side.
+ boolean isTablePosOnRight = (inputTypeOnRight != JoinInputNode.InputType.STREAM);
+
+ // stream and table keyIds are used to extract the join condition field (key) names and values out of the stream
+ // and table records.
List<Integer> streamKeyIds = new LinkedList<>();
List<Integer> tableKeyIds = new LinkedList<>();
@@ -98,25 +101,46 @@ class JoinTranslator {
populateStreamAndTableKeyIds(((RexCall) join.getCondition()).getOperands(), join, isTablePosOnRight, streamKeyIds,
tableKeyIds);
- Table table = loadLocalTable(isTablePosOnRight, tableKeyIds, join, context);
+ // Get the two input nodes (stream and table nodes) for the join.
+ JoinInputNode streamNode = new JoinInputNode(isTablePosOnRight ? join.getLeft() : join.getRight(), streamKeyIds,
+ isTablePosOnRight ? inputTypeOnLeft : inputTypeOnRight, !isTablePosOnRight);
+ JoinInputNode tableNode = new JoinInputNode(isTablePosOnRight ? join.getRight() : join.getLeft(), tableKeyIds,
+ isTablePosOnRight ? inputTypeOnRight : inputTypeOnLeft, isTablePosOnRight);
+
+ MessageStream<SamzaSqlRelMessage> inputStream = context.getMessageStream(streamNode.getRelNode().getId());
+ Table table = getTable(tableNode, context);
+
+ MessageStream<SamzaSqlRelMessage> outputStream =
+ joinStreamWithTable(inputStream, table, streamNode, tableNode, join, context);
+
+ context.registerMessageStream(join.getId(), outputStream);
+ }
- MessageStream<SamzaSqlRelMessage> inputStream =
- isTablePosOnRight ?
- context.getMessageStream(join.getLeft().getId()) : context.getMessageStream(join.getRight().getId());
+ private MessageStream<SamzaSqlRelMessage> joinStreamWithTable(MessageStream<SamzaSqlRelMessage> inputStream,
+ Table table, JoinInputNode streamNode, JoinInputNode tableNode, LogicalJoin join, TranslatorContext context) {
- List<String> streamFieldNames =
- new ArrayList<>((isTablePosOnRight ? join.getLeft() : join.getRight()).getRowType().getFieldNames());
- List<String> tableFieldNames =
- new ArrayList<>((isTablePosOnRight ? join.getRight() : join.getLeft()).getRowType().getFieldNames());
+ List<Integer> streamKeyIds = streamNode.getKeyIds();
+ List<Integer> tableKeyIds = tableNode.getKeyIds();
Validate.isTrue(streamKeyIds.size() == tableKeyIds.size());
+
log.info("Joining on the following Stream and Table field(s): ");
+ List<String> streamFieldNames = new ArrayList<>(streamNode.getFieldNames());
+ List<String> tableFieldNames = new ArrayList<>(tableNode.getFieldNames());
for (int i = 0; i < streamKeyIds.size(); i++) {
log.info(streamFieldNames.get(streamKeyIds.get(i)) + " with " + tableFieldNames.get(tableKeyIds.get(i)));
}
- SamzaSqlRelMessageJoinFunction joinFn =
- new SamzaSqlRelMessageJoinFunction(join.getJoinType(), isTablePosOnRight, streamKeyIds, streamFieldNames,
- tableKeyIds, tableFieldNames);
+ if (tableNode.isRemoteTable()) {
+ String remoteTableName = tableNode.getSourceName();
+ StreamTableJoinFunction joinFn = new SamzaSqlRemoteTableJoinFunction(context.getMsgConverter(remoteTableName),
+ context.getTableKeyConverter(remoteTableName), streamNode, tableNode, join.getJoinType(), queryId);
+
+ return inputStream.join(table, joinFn);
+ }
+
+ // Join with the local table
+
+ StreamTableJoinFunction joinFn = new SamzaSqlLocalTableJoinFunction(streamNode, tableNode, join.getJoinType());
SamzaSqlRelRecordSerdeFactory.SamzaSqlRelRecordSerde keySerde =
(SamzaSqlRelRecordSerdeFactory.SamzaSqlRelRecordSerde) new SamzaSqlRelRecordSerdeFactory().getSerde(null, null);
@@ -126,20 +150,17 @@ class JoinTranslator {
// Always re-partition the messages from the input stream by the composite key and then join the messages
// with the table. For the composite key, provide the corresponding table names in the key instead of using
// the names from the stream as the lookup needs to be done based on what is stored in the local table.
- MessageStream<SamzaSqlRelMessage> outputStream =
+ return
inputStream
.partitionBy(m -> createSamzaSqlCompositeKey(m, streamKeyIds,
- getSamzaSqlCompositeKeyFieldNames(tableFieldNames, tableKeyIds)),
- m -> m,
- KVSerde.of(keySerde, valueSerde),
- intermediateStreamPrefix + "stream_" + joinId)
+ getSamzaSqlCompositeKeyFieldNames(tableFieldNames, tableKeyIds)), m -> m, KVSerde.of(keySerde, valueSerde),
+ intermediateStreamPrefix + "stream_" + joinId)
.map(KV::getValue)
.join(table, joinFn);
-
- context.registerMessageStream(join.getId(), outputStream);
}
- private void validateJoinQuery(LogicalJoin join) {
+ private void validateJoinQuery(LogicalJoin join, JoinInputNode.InputType inputTypeOnLeft,
+ JoinInputNode.InputType inputTypeOnRight) {
JoinRelType joinRelType = join.getJoinType();
if (joinRelType.compareTo(JoinRelType.INNER) != 0 && joinRelType.compareTo(JoinRelType.LEFT) != 0
@@ -147,8 +168,8 @@ class JoinTranslator {
throw new SamzaException("Query with only INNER and LEFT/RIGHT OUTER join are supported.");
}
- boolean isTablePosOnLeft = isTable(join.getLeft());
- boolean isTablePosOnRight = isTable(join.getRight());
+ boolean isTablePosOnLeft = (inputTypeOnLeft != JoinInputNode.InputType.STREAM);
+ boolean isTablePosOnRight = (inputTypeOnRight != JoinInputNode.InputType.STREAM);
if (!isTablePosOnLeft && !isTablePosOnRight) {
throw new SamzaException("Invalid query with both sides of join being denoted as 'stream'. "
@@ -221,12 +242,13 @@ class JoinTranslator {
// the sql 'from' clause. Let's put the operand with smaller index in leftRef and larger
// index in rightRef so that the order of operands in the join condition is in the order
// the stream and table are specified in the 'from' clause.
+
RexInputRef leftRef = (RexInputRef) operands.get(0);
RexInputRef rightRef = (RexInputRef) operands.get(1);
// Let's validate the key used in the join condition.
- validateKey(leftRef);
- validateKey(rightRef);
+ validateJoinKeys(leftRef);
+ validateJoinKeys(rightRef);
if (leftRef.getIndex() > rightRef.getIndex()) {
RexInputRef tmpRef = leftRef;
@@ -240,12 +262,14 @@ class JoinTranslator {
tableKeyIds.add(isTablePosOnRight ? deltaKeyIdx : leftRef.getIndex());
}
- private void validateKey(RexInputRef ref) {
+ private void validateJoinKeys(RexInputRef ref) {
SqlTypeName sqlTypeName = ref.getType().getSqlTypeName();
- // Only primitive types are supported in the key
+
+ // Primitive types and ANY (for the record key) are supported in the key
if (sqlTypeName != SqlTypeName.BOOLEAN && sqlTypeName != SqlTypeName.TINYINT && sqlTypeName != SqlTypeName.SMALLINT
&& sqlTypeName != SqlTypeName.INTEGER && sqlTypeName != SqlTypeName.CHAR && sqlTypeName != SqlTypeName.BIGINT
- && sqlTypeName != SqlTypeName.VARCHAR && sqlTypeName != SqlTypeName.DOUBLE && sqlTypeName != SqlTypeName.FLOAT) {
+ && sqlTypeName != SqlTypeName.VARCHAR && sqlTypeName != SqlTypeName.DOUBLE && sqlTypeName != SqlTypeName.FLOAT
+ && sqlTypeName != SqlTypeName.ANY) {
log.error("Unsupported key type " + sqlTypeName + " used in join condition.");
throw new SamzaException("Unsupported key type used in join condition.");
}
@@ -257,9 +281,9 @@ class JoinTranslator {
SqlExplainLevel.EXPPLAN_ATTRIBUTES);
}
- private SqlIOConfig resolveSourceConfigForTable(RelNode relNode) {
+ private SqlIOConfig resolveSourceConfigForTable(RelNode relNode, TranslatorContext context) {
if (relNode instanceof LogicalProject) {
- return resolveSourceConfigForTable(((LogicalProject) relNode).getInput());
+ return resolveSourceConfigForTable(((LogicalProject) relNode).getInput(), context);
}
// We are returning the sourceConfig for the table as null when the table is in another join rather than an output
@@ -268,57 +292,76 @@ class JoinTranslator {
return null;
}
- String sourceName = String.join(".", relNode.getTable().getQualifiedName());
- SqlIOConfig sourceConfig = ioResolver.fetchSourceInfo(sourceName);
+ String sourceName = SqlIOConfig.getSourceFromSourceParts(relNode.getTable().getQualifiedName());
+ SqlIOConfig sourceConfig =
+ context.getExecutionContext().getSamzaSqlApplicationConfig().getInputSystemStreamConfigBySource().get(sourceName);
if (sourceConfig == null) {
throw new SamzaException("Unsupported source found in join statement: " + sourceName);
}
return sourceConfig;
}
- private boolean isTable(RelNode relNode) {
+ private JoinInputNode.InputType getInputType(RelNode relNode, TranslatorContext context) {
+
// NOTE: Any intermediate form of a join is always a stream. Eg: For the second level join of
// stream-table-table join, the left side of the join is join output, which we always
// assume to be a stream. The intermediate stream won't be an instance of EnumerableTableScan.
// The join key(s) for the table could be an udf in which case the relNode would be LogicalProject.
+
if (relNode instanceof EnumerableTableScan || relNode instanceof LogicalProject) {
- SqlIOConfig sourceTableConfig = resolveSourceConfigForTable(relNode);
- return sourceTableConfig != null && sourceTableConfig.getTableDescriptor().isPresent();
+ SqlIOConfig sourceTableConfig = resolveSourceConfigForTable(relNode, context);
+ if (sourceTableConfig == null || !sourceTableConfig.getTableDescriptor().isPresent()) {
+ return JoinInputNode.InputType.STREAM;
+ } else if (sourceTableConfig.getTableDescriptor().get() instanceof RemoteTableDescriptor) {
+ return JoinInputNode.InputType.REMOTE_TABLE;
+ } else {
+ return JoinInputNode.InputType.LOCAL_TABLE;
+ }
} else {
- return false;
+ return JoinInputNode.InputType.STREAM;
}
}
- private Table loadLocalTable(boolean isTablePosOnRight, List<Integer> tableKeyIds, LogicalJoin join, TranslatorContext context) {
- RelNode relNode = isTablePosOnRight ? join.getRight() : join.getLeft();
-
- MessageStream<SamzaSqlRelMessage> relOutputStream = context.getMessageStream(relNode.getId());
+ private Table getTable(JoinInputNode tableNode, TranslatorContext context) {
- SqlIOConfig sourceTableConfig = resolveSourceConfigForTable(relNode);
+ SqlIOConfig sourceTableConfig = resolveSourceConfigForTable(tableNode.getRelNode(), context);
if (sourceTableConfig == null || !sourceTableConfig.getTableDescriptor().isPresent()) {
- String errMsg = "Failed to resolve table source in join operation: node=" + relNode;
+ String errMsg = "Failed to resolve table source in join operation: node=" + tableNode.getRelNode();
log.error(errMsg);
throw new SamzaException(errMsg);
}
- // Create a table backed by RocksDb store with the fields in the join condition as composite key and relational
- // message as the value. Send the messages from the input stream denoted as 'table' to the created table store.
Table<KV<SamzaSqlRelRecord, SamzaSqlRelMessage>> table =
context.getStreamAppDescriptor().getTable(sourceTableConfig.getTableDescriptor().get());
+ if (tableNode.isRemoteTable()) {
+ return table;
+ }
+
+ // If local table, load the table.
+
+ // Load the local table with the fields in the join condition as composite key and relational message as the value.
+ // Send the messages from the input stream denoted as 'table' to the created table store.
+
+ MessageStream<SamzaSqlRelMessage> relOutputStream = context.getMessageStream(tableNode.getRelNode().getId());
+
SamzaSqlRelRecordSerdeFactory.SamzaSqlRelRecordSerde keySerde =
(SamzaSqlRelRecordSerdeFactory.SamzaSqlRelRecordSerde) new SamzaSqlRelRecordSerdeFactory().getSerde(null, null);
SamzaSqlRelMessageSerdeFactory.SamzaSqlRelMessageSerde valueSerde =
(SamzaSqlRelMessageSerdeFactory.SamzaSqlRelMessageSerde) new SamzaSqlRelMessageSerdeFactory().getSerde(null, null);
+
+ List<Integer> tableKeyIds = tableNode.getKeyIds();
+
// Let's always repartition by the join fields as key before sending the key and value to the table.
// We need to repartition the stream denoted as table to ensure that both the stream and table that are joined
- // have the same partitioning scheme and partition key.
+ // have the same partitioning scheme with the same partition key and number. Please note that bootstrap semantic is
+ // not propagated to the intermediate streams. Please refer SAMZA-1613 for more details on this. Subsequently, the
+ // results are consistent only after the local table is caught up.
+
relOutputStream
- .partitionBy(m -> createSamzaSqlCompositeKey(m, tableKeyIds),
- m -> m,
- KVSerde.of(keySerde, valueSerde),
- intermediateStreamPrefix + "table_" + joinId)
+ .partitionBy(m -> createSamzaSqlCompositeKey(m, tableKeyIds), m -> m,
+ KVSerde.of(keySerde, valueSerde), intermediateStreamPrefix + "table_" + joinId)
.sendTo(table);
return table;
http://git-wip-us.apache.org/repos/asf/samza/blob/12f421ce/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
index 4c5f11c..7f3c11e 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
@@ -104,15 +104,13 @@ public class QueryTranslator {
* For unit testing only
*/
@VisibleForTesting
- public void translate(SamzaSqlQueryParser.QueryInfo queryInfo, StreamApplicationDescriptor appDesc) {
+ public void translate(SamzaSqlQueryParser.QueryInfo queryInfo, StreamApplicationDescriptor appDesc, int queryId) {
QueryPlanner planner =
new QueryPlanner(sqlConfig.getRelSchemaProviders(), sqlConfig.getSystemStreamConfigsBySource(),
sqlConfig.getUdfMetadata());
final RelRoot relRoot = planner.plan(queryInfo.getSql());
- int queryId = 1;
SamzaSqlExecutionContext executionContext = new SamzaSqlExecutionContext(sqlConfig);
- Map<String, SamzaRelConverter> converters = sqlConfig.getSamzaRelConverters();
- TranslatorContext translatorContext = new TranslatorContext(appDesc, relRoot, executionContext, converters);
+ TranslatorContext translatorContext = new TranslatorContext(appDesc, relRoot, executionContext);
translate(relRoot, translatorContext, queryId);
Map<Integer, TranslatorContext> translatorContexts = new HashMap<>();
translatorContexts.put(queryId, translatorContext.clone());
@@ -124,7 +122,6 @@ public class QueryTranslator {
}
public void translate(RelRoot relRoot, TranslatorContext translatorContext, int queryId) {
- final SqlIOResolver ioResolver = translatorContext.getExecutionContext().getSamzaSqlApplicationConfig().getIoResolver();
final RelNode node = relRoot.project();
ScanTranslator scanTranslator =
new ScanTranslator(sqlConfig.getSamzaRelConverters(), sqlConfig.getInputSystemStreamConfigBySource(), queryId);
@@ -177,7 +174,7 @@ public class QueryTranslator {
public RelNode visit(LogicalJoin join) {
RelNode node = super.visit(join);
joinId++;
- new JoinTranslator(joinId, ioResolver, sqlConfig.getMetadataTopicPrefix())
+ new JoinTranslator(joinId, sqlConfig.getMetadataTopicPrefix(), queryId)
.translate(join, translatorContext);
return node;
}
http://git-wip-us.apache.org/repos/asf/samza/blob/12f421ce/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlLocalTableJoinFunction.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlLocalTableJoinFunction.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlLocalTableJoinFunction.java
new file mode 100644
index 0000000..f39f39a
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlLocalTableJoinFunction.java
@@ -0,0 +1,54 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.translator;
+
+import java.util.List;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.samza.operators.KV;
+import org.apache.samza.sql.SamzaSqlRelRecord;
+import org.apache.samza.sql.data.SamzaSqlRelMessage;
+
+
+/**
+ * This class joins incoming {@link SamzaSqlRelMessage} with records {@link KV}<{@link SamzaSqlRelRecord}, {@link SamzaSqlRelMessage}>}
+ * from local table with the join key being {@link SamzaSqlRelRecord}.
+ */
+public class SamzaSqlLocalTableJoinFunction
+ extends SamzaSqlTableJoinFunction<SamzaSqlRelRecord, KV<SamzaSqlRelRecord, SamzaSqlRelMessage>> {
+
+ SamzaSqlLocalTableJoinFunction(JoinInputNode streamNode, JoinInputNode tableNode, JoinRelType joinRelType) {
+ super(streamNode, tableNode, joinRelType);
+ }
+
+ @Override
+ protected List<Object> getTableRelRecordFieldValues(KV<SamzaSqlRelRecord, SamzaSqlRelMessage> record) {
+ return record.getValue().getSamzaSqlRelRecord().getFieldValues();
+ }
+
+ @Override
+ public SamzaSqlRelRecord getMessageKey(SamzaSqlRelMessage message) {
+ return getMessageKeyRelRecord(message);
+ }
+
+ @Override
+ public SamzaSqlRelRecord getRecordKey(KV<SamzaSqlRelRecord, SamzaSqlRelMessage> record) {
+ return record.getKey();
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/12f421ce/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlRelMessageJoinFunction.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlRelMessageJoinFunction.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlRelMessageJoinFunction.java
deleted file mode 100644
index d0a3d11..0000000
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlRelMessageJoinFunction.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
-
-package org.apache.samza.sql.translator;
-
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.calcite.rel.core.JoinRelType;
-import org.apache.commons.lang.Validate;
-import org.apache.samza.operators.KV;
-import org.apache.samza.operators.functions.StreamTableJoinFunction;
-import org.apache.samza.sql.SamzaSqlRelRecord;
-import org.apache.samza.sql.data.SamzaSqlRelMessage;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.samza.sql.data.SamzaSqlRelMessage.createSamzaSqlCompositeKey;
-import static org.apache.samza.sql.data.SamzaSqlRelMessage.getSamzaSqlCompositeKeyFieldNames;
-
-
-/**
- * This class joins incoming {@link SamzaSqlRelMessage} from a stream with the records in a table with the join key
- * being {@link SamzaSqlRelRecord}
- */
-public class SamzaSqlRelMessageJoinFunction
- implements StreamTableJoinFunction<SamzaSqlRelRecord, SamzaSqlRelMessage, KV<SamzaSqlRelRecord, SamzaSqlRelMessage>, SamzaSqlRelMessage> {
-
- private static final Logger log = LoggerFactory.getLogger(SamzaSqlRelMessageJoinFunction.class);
-
- private final JoinRelType joinRelType;
- private final boolean isTablePosOnRight;
- private final ArrayList<Integer> streamFieldIds;
- // Table field names are used in the outer join when the table record is not found.
- private final ArrayList<Integer> tableKeyIds;
- private final ArrayList<String> tableFieldNames;
- private final ArrayList<String> outFieldNames;
-
- SamzaSqlRelMessageJoinFunction(JoinRelType joinRelType, boolean isTablePosOnRight,
- List<Integer> streamFieldIds, List<String> streamFieldNames, List<Integer> tableKeyIds,
- List<String> tableFieldNames) {
- this.joinRelType = joinRelType;
- this.isTablePosOnRight = isTablePosOnRight;
- Validate.isTrue((joinRelType.compareTo(JoinRelType.LEFT) == 0 && isTablePosOnRight) ||
- (joinRelType.compareTo(JoinRelType.RIGHT) == 0 && !isTablePosOnRight) ||
- joinRelType.compareTo(JoinRelType.INNER) == 0);
- this.streamFieldIds = new ArrayList<>(streamFieldIds);
- this.tableKeyIds = new ArrayList<>(tableKeyIds);
- this.tableFieldNames = new ArrayList<>(tableFieldNames);
- this.outFieldNames = new ArrayList<>();
- if (isTablePosOnRight) {
- outFieldNames.addAll(streamFieldNames);
- }
- outFieldNames.addAll(tableFieldNames);
- if (!isTablePosOnRight) {
- outFieldNames.addAll(streamFieldNames);
- }
- }
-
- @Override
- public SamzaSqlRelMessage apply(SamzaSqlRelMessage message, KV<SamzaSqlRelRecord, SamzaSqlRelMessage> record) {
-
- if (joinRelType.compareTo(JoinRelType.INNER) == 0 && record == null) {
- log.debug("Inner Join: Record not found for the message with key: " + getMessageKey(message));
- // Returning null would result in Join operator implementation to filter out the message.
- return null;
- }
-
- // The resulting join output should be a SamzaSqlRelMessage containing the fields from both the stream message and
- // table record. The order of stream message fields and table record fields are dictated by the position of stream
- // and table in the 'from' clause of sql query. The output should also include the keys from both the stream message
- // and the table record.
- List<Object> outFieldValues = new ArrayList<>();
-
- // If table position is on the right, add the stream message fields first
- if (isTablePosOnRight) {
- outFieldValues.addAll(message.getSamzaSqlRelRecord().getFieldValues());
- }
-
- // Add the table record fields.
- if (record != null) {
- outFieldValues.addAll(record.getValue().getSamzaSqlRelRecord().getFieldValues());
- } else {
- // Table record could be null as the record could not be found in the store. This can
- // happen for outer joins. Add nulls to all the field values in the output message.
- tableFieldNames.forEach(s -> outFieldValues.add(null));
- }
-
- // If table position is on the left, add the stream message fields last
- if (!isTablePosOnRight) {
- outFieldValues.addAll(message.getSamzaSqlRelRecord().getFieldValues());
- }
-
- return new SamzaSqlRelMessage(outFieldNames, outFieldValues);
- }
-
- @Override
- public SamzaSqlRelRecord getMessageKey(SamzaSqlRelMessage message) {
- return createSamzaSqlCompositeKey(message, streamFieldIds,
- getSamzaSqlCompositeKeyFieldNames(tableFieldNames, tableKeyIds));
- }
-
- @Override
- public SamzaSqlRelRecord getRecordKey(KV<SamzaSqlRelRecord, SamzaSqlRelMessage> record) {
- return record.getKey();
- }
-
- @Override
- public void close() {
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/12f421ce/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlRemoteTableJoinFunction.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlRemoteTableJoinFunction.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlRemoteTableJoinFunction.java
new file mode 100644
index 0000000..11222da
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlRemoteTableJoinFunction.java
@@ -0,0 +1,79 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.translator;
+
+import java.util.List;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.samza.context.Context;
+import org.apache.samza.operators.KV;
+import org.apache.samza.sql.data.SamzaSqlRelMessage;
+import org.apache.samza.sql.interfaces.SamzaRelConverter;
+import org.apache.samza.sql.interfaces.SamzaRelTableKeyConverter;
+import org.apache.samza.sql.runner.SamzaSqlApplicationContext;
+
+
+/**
+ * This class joins incoming {@link SamzaSqlRelMessage} with records {@link KV} from a remote table with the join key
+ * defined by the format of the table key.
+ */
+public class SamzaSqlRemoteTableJoinFunction
+ extends SamzaSqlTableJoinFunction<Object, KV> {
+
+ private transient SamzaRelConverter msgConverter;
+ private transient SamzaRelTableKeyConverter relTableKeyConverter;
+ private final String tableName;
+ private final int queryId;
+
+ SamzaSqlRemoteTableJoinFunction(SamzaRelConverter msgConverter, SamzaRelTableKeyConverter tableKeyConverter,
+ JoinInputNode streamNode, JoinInputNode tableNode, JoinRelType joinRelType, int queryId) {
+ super(streamNode, tableNode, joinRelType);
+
+ this.msgConverter = msgConverter;
+ this.relTableKeyConverter = tableKeyConverter;
+ this.tableName = tableNode.getSourceName();
+ this.queryId = queryId;
+ }
+
+ @Override
+ public void init(Context context) {
+ TranslatorContext translatorContext =
+ ((SamzaSqlApplicationContext) context.getApplicationTaskContext()).getTranslatorContexts().get(queryId);
+ this.msgConverter = translatorContext.getMsgConverter(tableName);
+ this.relTableKeyConverter = translatorContext.getTableKeyConverter(tableName);
+ }
+
+ @Override
+ protected List<Object> getTableRelRecordFieldValues(KV record) {
+ // Using the message rel converter, convert message to sql rel message and add to output values.
+ SamzaSqlRelMessage relMessage = msgConverter.convertToRelMessage(record);
+ return relMessage.getSamzaSqlRelRecord().getFieldValues();
+ }
+
+ @Override
+ public Object getMessageKey(SamzaSqlRelMessage message) {
+ // Using the table key converter, convert message key from rel format to the record key format.
+ return relTableKeyConverter.convertToTableKeyFormat(getMessageKeyRelRecord(message));
+ }
+
+ @Override
+ public Object getRecordKey(KV record) {
+ return record.getKey();
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/12f421ce/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlTableJoinFunction.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlTableJoinFunction.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlTableJoinFunction.java
new file mode 100644
index 0000000..bbbf1d6
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlTableJoinFunction.java
@@ -0,0 +1,121 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.translator;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.commons.lang.Validate;
+import org.apache.samza.operators.functions.StreamTableJoinFunction;
+import org.apache.samza.sql.SamzaSqlRelRecord;
+import org.apache.samza.sql.data.SamzaSqlRelMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.samza.sql.data.SamzaSqlRelMessage.createSamzaSqlCompositeKey;
+import static org.apache.samza.sql.data.SamzaSqlRelMessage.getSamzaSqlCompositeKeyFieldNames;
+
+
+/**
+ * This abstract class joins incoming {@link SamzaSqlRelMessage} with records from a table with the join key.
+ */
+public abstract class SamzaSqlTableJoinFunction<K, R>
+ implements StreamTableJoinFunction<K, SamzaSqlRelMessage, R, SamzaSqlRelMessage> {
+
+ private static final Logger log = LoggerFactory.getLogger(SamzaSqlTableJoinFunction.class);
+
+ private final JoinRelType joinRelType;
+ private final boolean isTablePosOnRight;
+ private final ArrayList<Integer> streamFieldIds;
+ private final ArrayList<Integer> tableKeyIds;
+ // Table field names are used in the outer join when the table record is not found.
+ private final ArrayList<String> tableFieldNames;
+ private final ArrayList<String> outFieldNames;
+
+ SamzaSqlTableJoinFunction(JoinInputNode streamNode, JoinInputNode tableNode, JoinRelType joinRelType) {
+ this.joinRelType = joinRelType;
+ this.isTablePosOnRight = tableNode.isPosOnRight();
+
+ Validate.isTrue((joinRelType.compareTo(JoinRelType.LEFT) == 0 && isTablePosOnRight) ||
+ (joinRelType.compareTo(JoinRelType.RIGHT) == 0 && !isTablePosOnRight) ||
+ joinRelType.compareTo(JoinRelType.INNER) == 0);
+
+ this.streamFieldIds = new ArrayList<>(streamNode.getKeyIds());
+ this.tableKeyIds = new ArrayList<>(tableNode.getKeyIds());
+ this.tableFieldNames = new ArrayList<>(tableNode.getFieldNames());
+
+ this.outFieldNames = new ArrayList<>();
+ if (isTablePosOnRight) {
+ outFieldNames.addAll(streamNode.getFieldNames());
+ outFieldNames.addAll(tableFieldNames);
+ } else {
+ outFieldNames.addAll(tableFieldNames);
+ outFieldNames.addAll(streamNode.getFieldNames());
+ }
+ }
+
+ @Override
+ public SamzaSqlRelMessage apply(SamzaSqlRelMessage message, R record) {
+
+ if (joinRelType.compareTo(JoinRelType.INNER) == 0 && record == null) {
+ log.debug("Inner Join: Record not found for the message with key: " + getMessageKey(message));
+ // Returning null would result in Join operator implementation to filter out the message.
+ return null;
+ }
+
+ // The resulting join output should be a SamzaSqlRelMessage containing the fields from both the stream message and
+ // table record. The order of stream message fields and table record fields are dictated by the position of stream
+ // and table in the 'from' clause of sql query. The output should also include the keys from both the stream message
+ // and the table record.
+ List<Object> outFieldValues = new ArrayList<>();
+
+ // If table position is on the right, add the stream message fields first
+ if (isTablePosOnRight) {
+ outFieldValues.addAll(message.getSamzaSqlRelRecord().getFieldValues());
+ }
+
+ // Add the table record fields.
+ if (record != null) {
+ outFieldValues.addAll(getTableRelRecordFieldValues(record));
+ } else {
+ // Table record could be null as the record could not be found in the store. This can
+ // happen for outer joins. Add nulls to all the field values in the output message.
+ tableFieldNames.forEach(s -> outFieldValues.add(null));
+ }
+
+ // If table position is on the left, add the stream message fields last
+ if (!isTablePosOnRight) {
+ outFieldValues.addAll(message.getSamzaSqlRelRecord().getFieldValues());
+ }
+
+ return new SamzaSqlRelMessage(outFieldNames, outFieldValues);
+ }
+
+ protected abstract List<Object> getTableRelRecordFieldValues(R record);
+
+ protected SamzaSqlRelRecord getMessageKeyRelRecord(SamzaSqlRelMessage message) {
+ return createSamzaSqlCompositeKey(message, streamFieldIds,
+ getSamzaSqlCompositeKeyFieldNames(tableFieldNames, tableKeyIds));
+ }
+
+ @Override
+ public void close() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/12f421ce/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
index 27138a4..2615aad 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
@@ -36,6 +36,7 @@ import org.apache.samza.sql.data.SamzaSqlRelMessage;
import org.apache.samza.sql.interfaces.SamzaRelConverter;
import org.apache.samza.sql.interfaces.SqlIOConfig;
import org.apache.samza.sql.runner.SamzaSqlApplicationContext;
+import org.apache.samza.table.descriptors.RemoteTableDescriptor;
/**
@@ -92,6 +93,17 @@ class ScanTranslator {
final String streamName = sqlIOConfig.getStreamName();
final String source = sqlIOConfig.getSource();
+ final boolean isRemoteTable = sqlIOConfig.getTableDescriptor().isPresent() &&
+ (sqlIOConfig.getTableDescriptor().get() instanceof RemoteTableDescriptor);
+
+ // For remote table, we don't have an input stream descriptor. The table descriptor is already defined by the
+ // SqlIOResolverFactory.
+ // For local table, even though table descriptor is already defined, we still need to create the input stream
+ // descriptor to load the local table.
+ if (isRemoteTable) {
+ return;
+ }
+
KVSerde<Object, Object> noOpKVSerde = KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>());
DelegatingSystemDescriptor
sd = systemDescriptors.computeIfAbsent(systemName, DelegatingSystemDescriptor::new);
http://git-wip-us.apache.org/repos/asf/samza/blob/12f421ce/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorContext.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorContext.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorContext.java
index 099217b..5990897 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorContext.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorContext.java
@@ -34,6 +34,7 @@ import org.apache.calcite.rex.RexNode;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
import org.apache.samza.operators.MessageStream;
+import org.apache.samza.sql.interfaces.SamzaRelTableKeyConverter;
import org.apache.samza.system.descriptors.DelegatingSystemDescriptor;
import org.apache.samza.sql.data.RexToJavaCompiler;
import org.apache.samza.sql.data.SamzaSqlExecutionContext;
@@ -50,6 +51,7 @@ public class TranslatorContext implements Cloneable {
private final StreamApplicationDescriptor streamAppDesc;
private final RexToJavaCompiler compiler;
private final Map<String, SamzaRelConverter> relSamzaConverters;
+ private final Map<String, SamzaRelTableKeyConverter> relTableKeyConverters;
private final Map<Integer, MessageStream> messageStreams;
private final Map<Integer, RelNode> relNodes;
private final Map<String, DelegatingSystemDescriptor> systemDescriptors;
@@ -124,6 +126,7 @@ public class TranslatorContext implements Cloneable {
this.streamAppDesc = other.streamAppDesc;
this.compiler = other.compiler;
this.relSamzaConverters = other.relSamzaConverters;
+ this.relTableKeyConverters = other.relTableKeyConverters;
this.messageStreams = other.messageStreams;
this.relNodes = other.relNodes;
this.executionContext = other.executionContext.clone();
@@ -136,14 +139,14 @@ public class TranslatorContext implements Cloneable {
* @param streamAppDesc Samza's streamAppDesc that is populated during the translation.
* @param relRoot Root of the relational graph from calcite.
* @param executionContext the execution context
- * @param converters the map of schema to RelData converters
*/
- public TranslatorContext(StreamApplicationDescriptor streamAppDesc, RelRoot relRoot, SamzaSqlExecutionContext executionContext, Map<String, SamzaRelConverter> converters) {
+ public TranslatorContext(StreamApplicationDescriptor streamAppDesc, RelRoot relRoot, SamzaSqlExecutionContext executionContext) {
this.streamAppDesc = streamAppDesc;
this.compiler = createExpressionCompiler(relRoot);
this.executionContext = executionContext;
this.dataContext = new DataContextImpl();
- this.relSamzaConverters = converters;
+ this.relSamzaConverters = executionContext.getSamzaSqlApplicationConfig().getSamzaRelConverters();
+ this.relTableKeyConverters = executionContext.getSamzaSqlApplicationConfig().getSamzaRelTableKeyConverters();
this.messageStreams = new HashMap<>();
this.relNodes = new HashMap<>();
this.systemDescriptors = new HashMap<>();
@@ -212,6 +215,10 @@ public class TranslatorContext implements Cloneable {
return this.relSamzaConverters.get(source);
}
+ SamzaRelTableKeyConverter getTableKeyConverter(String source) {
+ return this.relTableKeyConverters.get(source);
+ }
+
/**
* This method helps to create a per task instance of translator context
*
http://git-wip-us.apache.org/repos/asf/samza/blob/12f421ce/samza-sql/src/test/java/org/apache/samza/sql/e2e/TestSamzaSqlRemoteTable.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/e2e/TestSamzaSqlRemoteTable.java b/samza-sql/src/test/java/org/apache/samza/sql/e2e/TestSamzaSqlRemoteTable.java
new file mode 100644
index 0000000..5bb180e
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/e2e/TestSamzaSqlRemoteTable.java
@@ -0,0 +1,115 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.e2e;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
+import org.apache.samza.sql.runner.SamzaSqlApplicationRunner;
+import org.apache.samza.sql.system.TestAvroSystemFactory;
+import org.apache.samza.sql.testutil.JsonUtil;
+import org.apache.samza.sql.testutil.SamzaSqlTestConfig;
+import org.apache.samza.sql.testutil.RemoteStoreIOResolverTestFactory;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class TestSamzaSqlRemoteTable {
+ @Test
+ public void testSinkEndToEndWithKey() throws Exception {
+ int numMessages = 20;
+
+ RemoteStoreIOResolverTestFactory.records.clear();
+
+ Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
+
+ String sql = "Insert into testRemoteStore.testTable.`$table` select __key__, id, name from testavro.SIMPLE1";
+ List<String> sqlStmts = Arrays.asList(sql);
+ staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
+ SamzaSqlApplicationRunner appRunnable = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs));
+ appRunnable.runAndWaitForFinish();
+
+ Assert.assertEquals(numMessages, RemoteStoreIOResolverTestFactory.records.size());
+ }
+
+ @Test (expected = AssertionError.class)
+ public void testSinkEndToEndWithoutKey() throws Exception {
+ int numMessages = 20;
+
+ RemoteStoreIOResolverTestFactory.records.clear();
+ Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
+
+ String sql = "Insert into testRemoteStore.testTable.`$table`(id,name) select id, name from testavro.SIMPLE1";
+ List<String> sqlStmts = Arrays.asList(sql);
+ staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
+ SamzaSqlApplicationRunner appRunnable = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs));
+ appRunnable.runAndWaitForFinish();
+
+ Assert.assertEquals(numMessages, RemoteStoreIOResolverTestFactory.records.size());
+ }
+
+ @Test
+ public void testSourceEndToEndWithKey() throws Exception {
+ int numMessages = 20;
+
+ Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
+ populateProfileTable(staticConfigs);
+
+ String sql =
+ "Insert into testavro.enrichedPageViewTopic "
+ + "select pv.pageKey as __key__, pv.pageKey as pageKey, coalesce(null, 'N/A') as companyName,"
+ + " p.name as profileName, p.address as profileAddress "
+ + "from testRemoteStore.Profile.`$table` as p "
+ + "join testavro.PAGEVIEW as pv "
+ + " on p.__key__ = pv.profileId";
+
+ List<String> sqlStmts = Arrays.asList(sql);
+ staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
+ SamzaSqlApplicationRunner appRunnable = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs));
+ appRunnable.runAndWaitForFinish();
+
+ List<String> outMessages = TestAvroSystemFactory.messages.stream()
+ .map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + ","
+ + (((GenericRecord) x.getMessage()).get("profileName") == null ? "null" :
+ ((GenericRecord) x.getMessage()).get("profileName").toString()))
+ .collect(Collectors.toList());
+ Assert.assertEquals(numMessages, outMessages.size());
+ List<String> expectedOutMessages = TestAvroSystemFactory.getPageKeyProfileNameJoin(numMessages);
+ Assert.assertEquals(expectedOutMessages, outMessages);
+ }
+
+ private void populateProfileTable(Map<String, String> staticConfigs) {
+ int numMessages = 20;
+
+ RemoteStoreIOResolverTestFactory.records.clear();
+
+ String sql = "Insert into testRemoteStore.Profile.`$table` select * from testavro.PROFILE";
+ List<String> sqlStmts = Arrays.asList(sql);
+ staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
+ SamzaSqlApplicationRunner appRunnable = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs));
+ appRunnable.runAndWaitForFinish();
+
+ Assert.assertEquals(numMessages, RemoteStoreIOResolverTestFactory.records.size());
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/12f421ce/samza-sql/src/test/java/org/apache/samza/sql/e2e/TestSamzaSqlTable.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/e2e/TestSamzaSqlTable.java b/samza-sql/src/test/java/org/apache/samza/sql/e2e/TestSamzaSqlTable.java
deleted file mode 100644
index ec0a993..0000000
--- a/samza-sql/src/test/java/org/apache/samza/sql/e2e/TestSamzaSqlTable.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
-
-package org.apache.samza.sql.e2e;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import org.apache.samza.config.MapConfig;
-import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
-import org.apache.samza.sql.runner.SamzaSqlApplicationRunner;
-import org.apache.samza.sql.testutil.JsonUtil;
-import org.apache.samza.sql.testutil.SamzaSqlTestConfig;
-import org.apache.samza.sql.testutil.TestIOResolverFactory;
-import org.junit.Assert;
-import org.junit.Test;
-
-
-public class TestSamzaSqlTable {
- @Test
- public void testEndToEnd() throws Exception {
- int numMessages = 20;
-
- TestIOResolverFactory.TestTable.records.clear();
-
- Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
-
- String sql1 = "Insert into testDb.testTable.`$table`(id,name) select id, name from testavro.SIMPLE1";
- List<String> sqlStmts = Arrays.asList(sql1);
- staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
- SamzaSqlApplicationRunner appRunnable = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs));
- appRunnable.runAndWaitForFinish();
-
- Assert.assertEquals(numMessages, TestIOResolverFactory.TestTable.records.size());
- }
-
- @Test
- public void testEndToEndWithKey() throws Exception {
- int numMessages = 20;
-
- TestIOResolverFactory.TestTable.records.clear();
- Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
-
- String sql1 = "Insert into testDb.testTable.`$table`(id,name) select id __key__, name from testavro.SIMPLE1";
- List<String> sqlStmts = Arrays.asList(sql1);
- staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
- SamzaSqlApplicationRunner appRunnable = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs));
- appRunnable.runAndWaitForFinish();
-
- Assert.assertEquals(numMessages, TestIOResolverFactory.TestTable.records.size());
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/12f421ce/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java b/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java
index 41c809e..9a9e269 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java
@@ -141,10 +141,10 @@ public class TestAvroSystemFactory implements SystemFactory {
private final int numMessages;
private final boolean includeNullForeignKeys;
private final long sleepBetweenPollsMs;
- private final Set<SystemStreamPartition> simpleRecordMap = new HashSet<>();
- private final Set<SystemStreamPartition> profileRecordMap = new HashSet<>();
- private final Set<SystemStreamPartition> companyRecordMap = new HashSet<>();
- private final Set<SystemStreamPartition> pageViewRecordMap = new HashSet<>();
+ private final Set<SystemStreamPartition> simpleRecordSsps = new HashSet<>();
+ private final Set<SystemStreamPartition> profileRecordSsps = new HashSet<>();
+ private final Set<SystemStreamPartition> companyRecordSsps = new HashSet<>();
+ private final Set<SystemStreamPartition> pageViewRecordSsps = new HashSet<>();
private final Map<SystemStreamPartition, Integer> curMessagesPerSsp = new HashMap<>();
public TestAvroSystemConsumer(String systemName, Config config) {
@@ -165,16 +165,16 @@ public class TestAvroSystemFactory implements SystemFactory {
@Override
public void register(SystemStreamPartition systemStreamPartition, String offset) {
if (systemStreamPartition.getStream().toLowerCase().contains("simple1")) {
- simpleRecordMap.add(systemStreamPartition);
+ simpleRecordSsps.add(systemStreamPartition);
}
if (systemStreamPartition.getStream().toLowerCase().contains("profile")) {
- profileRecordMap.add(systemStreamPartition);
+ profileRecordSsps.add(systemStreamPartition);
}
if (systemStreamPartition.getStream().toLowerCase().contains("company")) {
- companyRecordMap.add(systemStreamPartition);
+ companyRecordSsps.add(systemStreamPartition);
}
if (systemStreamPartition.getStream().toLowerCase().contains("pageview")) {
- pageViewRecordMap.add(systemStreamPartition);
+ pageViewRecordSsps.add(systemStreamPartition);
}
curMessagesPerSsp.put(systemStreamPartition, 0);
}
@@ -202,17 +202,20 @@ public class TestAvroSystemFactory implements SystemFactory {
}
private Object getKey(int index, SystemStreamPartition ssp) {
+ if (profileRecordSsps.contains(ssp) || companyRecordSsps.contains(ssp)) {
+ return index; // Keep this value the same as the profile/company record's "id" field.
+ }
return "key" + index;
}
private Object getData(int index, SystemStreamPartition ssp) {
- if (simpleRecordMap.contains(ssp)) {
+ if (simpleRecordSsps.contains(ssp)) {
return createSimpleRecord(index);
- } else if (profileRecordMap.contains(ssp)) {
+ } else if (profileRecordSsps.contains(ssp)) {
return createProfileRecord(index);
- } else if (companyRecordMap.contains(ssp)) {
+ } else if (companyRecordSsps.contains(ssp)) {
return createCompanyRecord(index);
- } else if (pageViewRecordMap.contains(ssp)) {
+ } else if (pageViewRecordSsps.contains(ssp)) {
return createPageViewRecord(index);
} else {
return createComplexRecord(index);
http://git-wip-us.apache.org/repos/asf/samza/blob/12f421ce/samza-sql/src/test/java/org/apache/samza/sql/testutil/RemoteStoreIOResolverTestFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/testutil/RemoteStoreIOResolverTestFactory.java b/samza-sql/src/test/java/org/apache/samza/sql/testutil/RemoteStoreIOResolverTestFactory.java
new file mode 100644
index 0000000..6ee9ee8
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/testutil/RemoteStoreIOResolverTestFactory.java
@@ -0,0 +1,147 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.testutil;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import org.apache.samza.config.Config;
+import org.apache.samza.sql.serializers.SamzaSqlRelMessageSerdeFactory;
+import org.apache.samza.sql.serializers.SamzaSqlRelRecordSerdeFactory;
+import org.apache.samza.table.descriptors.RemoteTableDescriptor;
+import org.apache.samza.table.descriptors.TableDescriptor;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.sql.interfaces.SqlIOConfig;
+import org.apache.samza.sql.interfaces.SqlIOResolver;
+import org.apache.samza.sql.interfaces.SqlIOResolverFactory;
+import org.apache.samza.storage.kv.descriptors.RocksDbTableDescriptor;
+import org.apache.samza.table.remote.TableReadFunction;
+import org.apache.samza.table.remote.TableWriteFunction;
+
+import static org.apache.samza.sql.runner.SamzaSqlApplicationConfig.CFG_METADATA_TOPIC_PREFIX;
+import static org.apache.samza.sql.runner.SamzaSqlApplicationConfig.DEFAULT_METADATA_TOPIC_PREFIX;
+
+
+public class RemoteStoreIOResolverTestFactory implements SqlIOResolverFactory {
+ public static final String TEST_REMOTE_STORE_SYSTEM = "testRemoteStore";
+ public static final String TEST_TABLE_ID = "testTableId";
+
+ public static transient Map<Object, Object> records = new HashMap<>();
+
+ @Override
+ public SqlIOResolver create(Config config, Config fullConfig) {
+ return new TestRemoteStoreIOResolver(config);
+ }
+
+ public static class InMemoryWriteFunction implements TableWriteFunction<Object, Object> {
+
+ @Override
+ public CompletableFuture<Void> putAsync(Object key, Object record) {
+ records.put(key.toString(), record);
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public CompletableFuture<Void> deleteAsync(Object key) {
+ records.remove(key.toString());
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public boolean isRetriable(Throwable exception) {
+ return false;
+ }
+ }
+
+ static class InMemoryReadFunction implements TableReadFunction<Object, Object> {
+
+ @Override
+ public CompletableFuture<Object> getAsync(Object key) {
+ return CompletableFuture.completedFuture(records.get(key.toString()));
+ }
+
+ @Override
+ public boolean isRetriable(Throwable exception) {
+ return false;
+ }
+ }
+
+ private class TestRemoteStoreIOResolver implements SqlIOResolver {
+ private final String SAMZA_SQL_QUERY_TABLE_KEYWORD = "$table";
+ private final Config config;
+ private final Map<String, TableDescriptor> tableDescMap = new HashMap<>();
+ private final String changeLogStorePrefix;
+
+ public TestRemoteStoreIOResolver(Config config) {
+ this.config = config;
+ String metadataTopicPrefix = config.get(CFG_METADATA_TOPIC_PREFIX, DEFAULT_METADATA_TOPIC_PREFIX);
+ this.changeLogStorePrefix = metadataTopicPrefix + (metadataTopicPrefix.isEmpty() ? "" : "_");
+ }
+
+ private SqlIOConfig fetchIOInfo(String ioName, boolean isSink) {
+ String[] sourceComponents = ioName.split("\\.");
+ int systemIdx = 0;
+ int endIdx = sourceComponents.length - 1;
+ int streamIdx = endIdx;
+ TableDescriptor tableDescriptor = null;
+
+ if (sourceComponents[endIdx].equalsIgnoreCase(SAMZA_SQL_QUERY_TABLE_KEYWORD)) {
+ streamIdx = endIdx - 1;
+
+ tableDescriptor = tableDescMap.get(ioName);
+
+ if (tableDescriptor == null) {
+ if (isSink) {
+ tableDescriptor = new RemoteTableDescriptor<>(TEST_TABLE_ID + "-" + ioName.replace(".", "-").replace("$", "-"))
+ .withReadFunction(new InMemoryReadFunction())
+ .withWriteFunction(new InMemoryWriteFunction());
+ } else if (sourceComponents[systemIdx].equals(TEST_REMOTE_STORE_SYSTEM)) {
+ tableDescriptor = new RemoteTableDescriptor<>(TEST_TABLE_ID + "-" + ioName.replace(".", "-").replace("$", "-"))
+ .withReadFunction(new InMemoryReadFunction());
+ } else {
+ // A local table
+ String tableId = changeLogStorePrefix + "InputTable-" + ioName.replace(".", "-").replace("$", "-");
+ SamzaSqlRelRecordSerdeFactory.SamzaSqlRelRecordSerde keySerde =
+ (SamzaSqlRelRecordSerdeFactory.SamzaSqlRelRecordSerde) new SamzaSqlRelRecordSerdeFactory().getSerde(null, null);
+ SamzaSqlRelMessageSerdeFactory.SamzaSqlRelMessageSerde valueSerde =
+ (SamzaSqlRelMessageSerdeFactory.SamzaSqlRelMessageSerde) new SamzaSqlRelMessageSerdeFactory().getSerde(null, null);
+ tableDescriptor = new RocksDbTableDescriptor(tableId, KVSerde.of(keySerde, valueSerde)).withChangelogEnabled();
+ }
+ tableDescMap.put(ioName, tableDescriptor);
+ }
+ }
+
+ Config systemConfigs = config.subset(sourceComponents[systemIdx] + ".");
+ return new SqlIOConfig(sourceComponents[systemIdx], sourceComponents[streamIdx],
+ Arrays.asList(sourceComponents), systemConfigs, tableDescriptor);
+ }
+
+ @Override
+ public SqlIOConfig fetchSourceInfo(String sourceName) {
+ return fetchIOInfo(sourceName, false);
+ }
+
+ @Override
+ public SqlIOConfig fetchSinkInfo(String sinkName) {
+ return fetchIOInfo(sinkName, true);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/12f421ce/samza-sql/src/test/java/org/apache/samza/sql/testutil/SampleRelTableKeyConverter.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/testutil/SampleRelTableKeyConverter.java b/samza-sql/src/test/java/org/apache/samza/sql/testutil/SampleRelTableKeyConverter.java
new file mode 100644
index 0000000..9f3f8a0
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/testutil/SampleRelTableKeyConverter.java
@@ -0,0 +1,36 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.testutil;
+
+import java.util.stream.Collectors;
+import org.apache.samza.sql.SamzaSqlRelRecord;
+import org.apache.samza.sql.interfaces.SamzaRelTableKeyConverter;
+
+
+/**
+ * A sample {@link SamzaRelTableKeyConverter} used in tests to convert the join key to table format.
+ */
+public class SampleRelTableKeyConverter implements SamzaRelTableKeyConverter {
+
+ @Override
+ public Object convertToTableKeyFormat(SamzaSqlRelRecord relRecord) {
+ return relRecord.getFieldValues().stream().map(Object::toString).collect(Collectors.toList()).get(0);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/12f421ce/samza-sql/src/test/java/org/apache/samza/sql/testutil/SampleRelTableKeyConverterFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/testutil/SampleRelTableKeyConverterFactory.java b/samza-sql/src/test/java/org/apache/samza/sql/testutil/SampleRelTableKeyConverterFactory.java
new file mode 100644
index 0000000..2853ed4
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/testutil/SampleRelTableKeyConverterFactory.java
@@ -0,0 +1,41 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.testutil;
+
+import java.util.HashMap;
+import org.apache.samza.config.Config;
+import org.apache.samza.sql.interfaces.RelSchemaProvider;
+import org.apache.samza.sql.interfaces.SamzaRelTableKeyConverter;
+import org.apache.samza.sql.interfaces.SamzaRelTableKeyConverterFactory;
+import org.apache.samza.system.SystemStream;
+
+
+/**
+ * A sample {@link SamzaRelTableKeyConverterFactory} used in tests to create {@link SampleRelTableKeyConverter}.
+ */
+public class SampleRelTableKeyConverterFactory implements SamzaRelTableKeyConverterFactory {
+
+ private final HashMap<SystemStream, SamzaRelTableKeyConverter> relConverters = new HashMap<>();
+
+ @Override
+ public SamzaRelTableKeyConverter create(SystemStream systemStream, Config config) {
+ return relConverters.computeIfAbsent(systemStream, ss -> new SampleRelTableKeyConverter());
+ }
+}