You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by GitBox <gi...@apache.org> on 2020/06/18 21:46:16 UTC

[GitHub] [samza] b-slim opened a new pull request #1386: [WIP] Adding support for nested rows access via dot path.

b-slim opened a new pull request #1386:
URL: https://github.com/apache/samza/pull/1386


   This is an initial draft on how to support the nested row access in Samza SQL.
   There are multiple  #interconnected items.
   
   1. Added a the actual definition of a ROW Calcite Data Type [**EASY FINAL**].
   
   2. Added a Row Converter form Samza Type System to Calcite Type System [**Okay for now but will need more work for types like timestamps**].
   3. Added a Collector for projects and filters that are pushed to Remote Table Scan **[Complex and Needs Discussions**]. 
   
    - Why we need this ? Adding a nested row struct forces the addition of project and in general nothing stops Calcite logical planner to add such an identity project thus this is needed anyway.
   
   - How this done ? As of now I chose to minimize the amount of rewrite or refactor and added a queue to collect the call stack between Remote table Scan and Join node. Then When doing the join The Project and Filter will happen post Join Lookup. We need to handle the case where filter does not match and null pad the result or return null as by current convention. To be honest I am still debating adding the Filter push down seems like there is no real gain since we have done the lookup already.
   
   4. Need to Add more Type conversion To support legacy UDFs that operate on non scalar types and assume Everything is a SamzaRelRecord or Java Maps [**Not Done yet maybe Followup**].
   5. Need more code cleaning where type is mixed up between String Java and Avro Utf8 Java as a Key in the map [**WIP**].
   6. Need more work on the union Type System case we have more than 2 Types [**Followup**].
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] b-slim commented on a change in pull request #1386: [SAMZA-2557] Adding support for nested rows access via dot path.

Posted by GitBox <gi...@apache.org>.
b-slim commented on a change in pull request #1386:
URL: https://github.com/apache/samza/pull/1386#discussion_r457602488



##########
File path: samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
##########
@@ -110,46 +114,50 @@ private void registerSourceSchemas(SchemaPlus rootSchema) {
   }
 
   public RelRoot plan(String query) {
-    try {
-      Connection connection = DriverManager.getConnection("jdbc:calcite:");
-      CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
-      SchemaPlus rootSchema = calciteConnection.getRootSchema();
-      registerSourceSchemas(rootSchema);
-
-      List<SamzaSqlScalarFunctionImpl> samzaSqlFunctions = udfMetadata.stream()
-          .map(x -> new SamzaSqlScalarFunctionImpl(x))
-          .collect(Collectors.toList());
-
-      final List<RelTraitDef> traitDefs = new ArrayList<>();
-
-      traitDefs.add(ConventionTraitDef.INSTANCE);
-      traitDefs.add(RelCollationTraitDef.INSTANCE);
-
-      List<SqlOperatorTable> sqlOperatorTables = new ArrayList<>();
-      sqlOperatorTables.add(new SamzaSqlOperatorTable());
-      sqlOperatorTables.add(new SamzaSqlUdfOperatorTable(samzaSqlFunctions));
-
-      // Using lenient so that !=,%,- are allowed.
-      FrameworkConfig frameworkConfig = Frameworks.newConfigBuilder()
-          .parserConfig(SqlParser.configBuilder()
-              .setLex(Lex.JAVA)
-              .setConformance(SqlConformanceEnum.LENIENT)
-              .setCaseSensitive(false) // Make Udfs case insensitive
-              .build())
-          .defaultSchema(rootSchema)
-          .operatorTable(new ChainedSqlOperatorTable(sqlOperatorTables))
-          .sqlToRelConverterConfig(SqlToRelConverter.Config.DEFAULT)
-          .traitDefs(traitDefs)
-          .context(Contexts.EMPTY_CONTEXT)
-          .costFactory(null)
-          .build();
-      Planner planner = Frameworks.getPlanner(frameworkConfig);
-
+    SchemaPlus rootSchema = CalciteSchema.createRootSchema(true, false).plus();

Review comment:
       We do not need caching, it is a historic optimization flag that most of cases not really needed because schema changes all the time. In fact that is the default in Calcite.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] b-slim commented on a change in pull request #1386: [WIP] Adding support for nested rows access via dot path.

Posted by GitBox <gi...@apache.org>.
b-slim commented on a change in pull request #1386:
URL: https://github.com/apache/samza/pull/1386#discussion_r444587745



##########
File path: samza-sql/src/main/java/org/apache/samza/sql/translator/MessageStreamCollector.java
##########
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.translator;
+
+import java.io.Closeable;
+import java.io.Serializable;
+import java.time.Duration;
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.function.Function;
+import org.apache.samza.context.Context;
+import org.apache.samza.operators.KV;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.functions.AsyncFlatMapFunction;
+import org.apache.samza.operators.functions.ClosableFunction;
+import org.apache.samza.operators.functions.FilterFunction;
+import org.apache.samza.operators.functions.FlatMapFunction;
+import org.apache.samza.operators.functions.JoinFunction;
+import org.apache.samza.operators.functions.MapFunction;
+import org.apache.samza.operators.functions.SinkFunction;
+import org.apache.samza.operators.functions.StreamTableJoinFunction;
+import org.apache.samza.operators.windows.Window;
+import org.apache.samza.operators.windows.WindowPane;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.sql.data.SamzaSqlRelMessage;
+import org.apache.samza.table.Table;
+
+
+/**
+ * Collector of Map and Filter Samza Function, used to collect current call stack and trigger it when applying the join function.
+ *
+ * @TODO This class is a work around here to minimize the amount of code changes, but in an ideal world,
+ * @TODO where we use Calcite planner in conventional way we can combine function when via translation of RelNodes.
+ */
+class MessageStreamCollector implements MessageStream<SamzaSqlRelMessage>, Serializable, Closeable {
+
+  private final Deque<MapFunction<? super SamzaSqlRelMessage, ? extends SamzaSqlRelMessage>> _mapFnCallQueue =
+      new ArrayDeque<>();
+  private final Deque<ClosableFunction> _closingStack = new ArrayDeque<>();
+
+  @Override
+  public <OM> MessageStream<OM> map(MapFunction<? super SamzaSqlRelMessage, ? extends OM> mapFn) {
+    _mapFnCallQueue.offer((MapFunction<? super SamzaSqlRelMessage, ? extends SamzaSqlRelMessage>) mapFn);
+    return (MessageStream<OM>) this;
+  }
+
+  @Override
+  public MessageStream<SamzaSqlRelMessage> filter(FilterFunction<? super SamzaSqlRelMessage> filterFn) {
+    _mapFnCallQueue.offer(new FilterMapAdapter(filterFn));
+    return this;
+  }
+
+   Function<SamzaSqlRelMessage, SamzaSqlRelMessage> getFunction(Context context) {
+    Function<SamzaSqlRelMessage, SamzaSqlRelMessage> tailFn = null;
+    while (!_mapFnCallQueue.isEmpty()) {
+      MapFunction<? super SamzaSqlRelMessage, ? extends SamzaSqlRelMessage> f = _mapFnCallQueue.poll();
+      f.init(context);
+      _closingStack.push(f);
+      Function<SamzaSqlRelMessage, SamzaSqlRelMessage> current = x -> {
+        if (x != null) {
+          return f.apply(x);
+        }
+        return null;
+      };
+      if (tailFn == null) {
+        tailFn = current;
+      } else {
+        tailFn = current.compose(tailFn);
+      }
+    }
+    return tailFn == null ? Function.identity() : tailFn;
+  }
+
+  private static class FilterMapAdapter implements MapFunction<SamzaSqlRelMessage, SamzaSqlRelMessage> {

Review comment:
       Added comments let me know if it is still unclear.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] b-slim commented on a change in pull request #1386: [WIP] Adding support for nested rows access via dot path.

Posted by GitBox <gi...@apache.org>.
b-slim commented on a change in pull request #1386:
URL: https://github.com/apache/samza/pull/1386#discussion_r444587569



##########
File path: samza-sql/src/main/java/org/apache/samza/sql/translator/MessageStreamCollector.java
##########
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.translator;
+
+import java.io.Closeable;
+import java.io.Serializable;
+import java.time.Duration;
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.function.Function;
+import org.apache.samza.context.Context;
+import org.apache.samza.operators.KV;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.functions.AsyncFlatMapFunction;
+import org.apache.samza.operators.functions.ClosableFunction;
+import org.apache.samza.operators.functions.FilterFunction;
+import org.apache.samza.operators.functions.FlatMapFunction;
+import org.apache.samza.operators.functions.JoinFunction;
+import org.apache.samza.operators.functions.MapFunction;
+import org.apache.samza.operators.functions.SinkFunction;
+import org.apache.samza.operators.functions.StreamTableJoinFunction;
+import org.apache.samza.operators.windows.Window;
+import org.apache.samza.operators.windows.WindowPane;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.sql.data.SamzaSqlRelMessage;
+import org.apache.samza.table.Table;
+
+
+/**
+ * Collector of Map and Filter Samza Function, used to collect current call stack and trigger it when applying the join function.
+ *
+ * @TODO This class is a work around here to minimize the amount of code changes, but in an ideal world,
+ * @TODO where we use Calcite planner in conventional way we can combine function when via translation of RelNodes.
+ */
+class MessageStreamCollector implements MessageStream<SamzaSqlRelMessage>, Serializable, Closeable {
+
+  private final Deque<MapFunction<? super SamzaSqlRelMessage, ? extends SamzaSqlRelMessage>> _mapFnCallQueue =
+      new ArrayDeque<>();
+  private final Deque<ClosableFunction> _closingStack = new ArrayDeque<>();
+
+  @Override
+  public <OM> MessageStream<OM> map(MapFunction<? super SamzaSqlRelMessage, ? extends OM> mapFn) {
+    _mapFnCallQueue.offer((MapFunction<? super SamzaSqlRelMessage, ? extends SamzaSqlRelMessage>) mapFn);
+    return (MessageStream<OM>) this;
+  }
+
+  @Override
+  public MessageStream<SamzaSqlRelMessage> filter(FilterFunction<? super SamzaSqlRelMessage> filterFn) {
+    _mapFnCallQueue.offer(new FilterMapAdapter(filterFn));
+    return this;
+  }
+
+   Function<SamzaSqlRelMessage, SamzaSqlRelMessage> getFunction(Context context) {
+    Function<SamzaSqlRelMessage, SamzaSqlRelMessage> tailFn = null;
+    while (!_mapFnCallQueue.isEmpty()) {

Review comment:
       @srinipunuru added comments at the top of the class let me know if this still unclear.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] srinipunuru commented on a change in pull request #1386: [WIP] Adding support for nested rows access via dot path.

Posted by GitBox <gi...@apache.org>.
srinipunuru commented on a change in pull request #1386:
URL: https://github.com/apache/samza/pull/1386#discussion_r443631316



##########
File path: samza-sql/src/main/java/org/apache/samza/sql/translator/MessageStreamCollector.java
##########
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.translator;
+
+import java.io.Closeable;
+import java.io.Serializable;
+import java.time.Duration;
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.function.Function;
+import org.apache.samza.context.Context;
+import org.apache.samza.operators.KV;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.functions.AsyncFlatMapFunction;
+import org.apache.samza.operators.functions.ClosableFunction;
+import org.apache.samza.operators.functions.FilterFunction;
+import org.apache.samza.operators.functions.FlatMapFunction;
+import org.apache.samza.operators.functions.JoinFunction;
+import org.apache.samza.operators.functions.MapFunction;
+import org.apache.samza.operators.functions.SinkFunction;
+import org.apache.samza.operators.functions.StreamTableJoinFunction;
+import org.apache.samza.operators.windows.Window;
+import org.apache.samza.operators.windows.WindowPane;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.sql.data.SamzaSqlRelMessage;
+import org.apache.samza.table.Table;
+
+
+/**
+ * Collector of Map and Filter Samza Function, used to collect current call stack and trigger it when applying the join function.
+ *
+ * @TODO This class is a work around here to minimize the amount of code changes, but in an ideal world,
+ * @TODO where we use Calcite planner in conventional way we can combine function when via translation of RelNodes.
+ */
+class MessageStreamCollector implements MessageStream<SamzaSqlRelMessage>, Serializable, Closeable {
+
+  private final Deque<MapFunction<? super SamzaSqlRelMessage, ? extends SamzaSqlRelMessage>> _mapFnCallQueue =
+      new ArrayDeque<>();
+  private final Deque<ClosableFunction> _closingStack = new ArrayDeque<>();
+
+  @Override
+  public <OM> MessageStream<OM> map(MapFunction<? super SamzaSqlRelMessage, ? extends OM> mapFn) {
+    _mapFnCallQueue.offer((MapFunction<? super SamzaSqlRelMessage, ? extends SamzaSqlRelMessage>) mapFn);
+    return (MessageStream<OM>) this;
+  }
+
+  @Override
+  public MessageStream<SamzaSqlRelMessage> filter(FilterFunction<? super SamzaSqlRelMessage> filterFn) {
+    _mapFnCallQueue.offer(new FilterMapAdapter(filterFn));
+    return this;
+  }
+
+   Function<SamzaSqlRelMessage, SamzaSqlRelMessage> getFunction(Context context) {
+    Function<SamzaSqlRelMessage, SamzaSqlRelMessage> tailFn = null;
+    while (!_mapFnCallQueue.isEmpty()) {
+      MapFunction<? super SamzaSqlRelMessage, ? extends SamzaSqlRelMessage> f = _mapFnCallQueue.poll();
+      f.init(context);
+      _closingStack.push(f);
+      Function<SamzaSqlRelMessage, SamzaSqlRelMessage> current = x -> {
+        if (x != null) {
+          return f.apply(x);
+        }
+        return null;
+      };
+      if (tailFn == null) {
+        tailFn = current;
+      } else {
+        tailFn = current.compose(tailFn);
+      }
+    }
+    return tailFn == null ? Function.identity() : tailFn;
+  }
+
+  private static class FilterMapAdapter implements MapFunction<SamzaSqlRelMessage, SamzaSqlRelMessage> {

Review comment:
       Can you add comments here as well on why this adapter is required and what it does?

##########
File path: samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
##########
@@ -141,8 +141,16 @@ void translate(final LogicalJoin join, final TranslatorContext translatorContext
 
     if (tableNode.isRemoteTable()) {
       String remoteTableName = tableNode.getSourceName();
-      StreamTableJoinFunction joinFn = new SamzaSqlRemoteTableJoinFunction(context.getMsgConverter(remoteTableName),
-          context.getTableKeyConverter(remoteTableName), streamNode, tableNode, join.getJoinType(), queryId);
+      MessageStream operatorStack = context.getMessageStream(tableNode.getRelNode().getId());

Review comment:
       Looks like this is where the magic is happening, Can you add some comments describing what we are doing and why we are doing this?

##########
File path: samza-sql/src/main/java/org/apache/samza/sql/translator/MessageStreamCollector.java
##########
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.translator;
+
+import java.io.Closeable;
+import java.io.Serializable;
+import java.time.Duration;
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.function.Function;
+import org.apache.samza.context.Context;
+import org.apache.samza.operators.KV;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.functions.AsyncFlatMapFunction;
+import org.apache.samza.operators.functions.ClosableFunction;
+import org.apache.samza.operators.functions.FilterFunction;
+import org.apache.samza.operators.functions.FlatMapFunction;
+import org.apache.samza.operators.functions.JoinFunction;
+import org.apache.samza.operators.functions.MapFunction;
+import org.apache.samza.operators.functions.SinkFunction;
+import org.apache.samza.operators.functions.StreamTableJoinFunction;
+import org.apache.samza.operators.windows.Window;
+import org.apache.samza.operators.windows.WindowPane;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.sql.data.SamzaSqlRelMessage;
+import org.apache.samza.table.Table;
+
+
+/**
+ * Collector of Map and Filter Samza Function, used to collect current call stack and trigger it when applying the join function.
+ *
+ * @TODO This class is a work around here to minimize the amount of code changes, but in an ideal world,
+ * @TODO where we use Calcite planner in conventional way we can combine function when via translation of RelNodes.
+ */
+class MessageStreamCollector implements MessageStream<SamzaSqlRelMessage>, Serializable, Closeable {
+
+  private final Deque<MapFunction<? super SamzaSqlRelMessage, ? extends SamzaSqlRelMessage>> _mapFnCallQueue =
+      new ArrayDeque<>();
+  private final Deque<ClosableFunction> _closingStack = new ArrayDeque<>();
+
+  @Override
+  public <OM> MessageStream<OM> map(MapFunction<? super SamzaSqlRelMessage, ? extends OM> mapFn) {
+    _mapFnCallQueue.offer((MapFunction<? super SamzaSqlRelMessage, ? extends SamzaSqlRelMessage>) mapFn);
+    return (MessageStream<OM>) this;
+  }
+
+  @Override
+  public MessageStream<SamzaSqlRelMessage> filter(FilterFunction<? super SamzaSqlRelMessage> filterFn) {
+    _mapFnCallQueue.offer(new FilterMapAdapter(filterFn));
+    return this;
+  }
+
+   Function<SamzaSqlRelMessage, SamzaSqlRelMessage> getFunction(Context context) {
+    Function<SamzaSqlRelMessage, SamzaSqlRelMessage> tailFn = null;
+    while (!_mapFnCallQueue.isEmpty()) {

Review comment:
       Can you add more comments on what we are doing here?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] b-slim commented on a change in pull request #1386: [SAMZA-2557] Adding support for nested rows access via dot path.

Posted by GitBox <gi...@apache.org>.
b-slim commented on a change in pull request #1386:
URL: https://github.com/apache/samza/pull/1386#discussion_r458281080



##########
File path: samza-sql/src/main/java/org/apache/samza/sql/udf/GetNestedField.java
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.udf;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import java.lang.reflect.Type;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.calcite.adapter.enumerable.CallImplementor;
+import org.apache.calcite.adapter.enumerable.EnumUtils;
+import org.apache.calcite.adapter.enumerable.NullPolicy;
+import org.apache.calcite.adapter.enumerable.RexImpTable;
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
+import org.apache.calcite.linq4j.tree.ConstantExpression;
+import org.apache.calcite.linq4j.tree.Expression;
+import org.apache.calcite.linq4j.tree.ExpressionType;
+import org.apache.calcite.linq4j.tree.Expressions;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.schema.Function;
+import org.apache.calcite.schema.FunctionParameter;
+import org.apache.calcite.schema.ImplementableFunction;
+import org.apache.calcite.schema.ScalarFunction;
+import org.apache.calcite.sql.SqlCallBinding;
+import org.apache.calcite.sql.SqlFunction;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperandCountRange;
+import org.apache.calcite.sql.SqlOperatorBinding;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.sql.type.OperandTypes;
+import org.apache.calcite.sql.type.SqlOperandCountRanges;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.sql.validate.SqlUserDefinedFunction;
+
+import static org.apache.calcite.schema.impl.ReflectiveFunctionBase.builder;
+
+
+/**
+ * Operator to extract nested Rows or Fields form a struct row type using a dotted path.
+ * The goal of this operator is two-fold.
+ * First it is a temporary fix for https://issues.apache.org/jira/browse/CALCITE-4065 to extract a row from a row.
+ * Second it will enable smooth backward compatible migration from existing udf that relies on legacy row format.

Review comment:
       see https://github.com/apache/samza/pull/1386/commits/3e95902b615bd90ec686902fadbab9f98c03a75c




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] atoomula commented on a change in pull request #1386: [SAMZA-2557] Adding support for nested rows access via dot path.

Posted by GitBox <gi...@apache.org>.
atoomula commented on a change in pull request #1386:
URL: https://github.com/apache/samza/pull/1386#discussion_r457612651



##########
File path: samza-sql/src/main/java/org/apache/samza/sql/udf/GetNestedField.java
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.udf;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import java.lang.reflect.Type;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.calcite.adapter.enumerable.CallImplementor;
+import org.apache.calcite.adapter.enumerable.EnumUtils;
+import org.apache.calcite.adapter.enumerable.NullPolicy;
+import org.apache.calcite.adapter.enumerable.RexImpTable;
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
+import org.apache.calcite.linq4j.tree.ConstantExpression;
+import org.apache.calcite.linq4j.tree.Expression;
+import org.apache.calcite.linq4j.tree.ExpressionType;
+import org.apache.calcite.linq4j.tree.Expressions;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.schema.Function;
+import org.apache.calcite.schema.FunctionParameter;
+import org.apache.calcite.schema.ImplementableFunction;
+import org.apache.calcite.schema.ScalarFunction;
+import org.apache.calcite.sql.SqlCallBinding;
+import org.apache.calcite.sql.SqlFunction;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperandCountRange;
+import org.apache.calcite.sql.SqlOperatorBinding;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.sql.type.OperandTypes;
+import org.apache.calcite.sql.type.SqlOperandCountRanges;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.sql.validate.SqlUserDefinedFunction;
+
+import static org.apache.calcite.schema.impl.ReflectiveFunctionBase.builder;
+
+
+/**
+ * Operator to extract nested Rows or Fields form a struct row type using a dotted path.
+ * The goal of this operator is two-fold.
+ * First it is a temporary fix for https://issues.apache.org/jira/browse/CALCITE-4065 to extract a row from a row.
+ * Second it will enable smooth backward compatible migration from existing udf that relies on legacy row format.

Review comment:
       In the above test TestGetSqlFieldUdf, can you take a look to see if it handles tests starting from testMapAtLastField to testArrayAtAllIntermediateFields ? I don't think we have any tests in TestSamzaSqlEndToEnd that test such complex types at intermediate fields.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] b-slim commented on a change in pull request #1386: [WIP] Adding support for nested rows access via dot path.

Posted by GitBox <gi...@apache.org>.
b-slim commented on a change in pull request #1386:
URL: https://github.com/apache/samza/pull/1386#discussion_r443023108



##########
File path: samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
##########
@@ -141,20 +142,28 @@ public RelRoot plan(String query) {
           .sqlToRelConverterConfig(SqlToRelConverter.Config.DEFAULT)
           .traitDefs(traitDefs)
           .context(Contexts.EMPTY_CONTEXT)
-          .costFactory(null)
+          .programs(
+              Programs.hep(ImmutableList.of(FilterJoinRule.FILTER_ON_JOIN), true, DefaultRelMetadataProvider.INSTANCE))

Review comment:
       @atoomula this rule is not really the main part of this work and I don't think we need to fix/choose now which rule should be used to handle filters or should or should not be pushed to table scan this is beyond the scope of this PR. 
   The main goal is to explore how we can handle the **operator stack between the join and remote table scan**. 
   I added the rule to ensure that if a filter is there things works as expected and it seems it is working. But I would love to know what are the **blind spot(s)** I am missing about what work and does not work well with the remote table scan if you think this will help drive this issue. Thanks




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] atoomula commented on a change in pull request #1386: [SAMZA-2557] Adding support for nested rows access via dot path.

Posted by GitBox <gi...@apache.org>.
atoomula commented on a change in pull request #1386:
URL: https://github.com/apache/samza/pull/1386#discussion_r457129009



##########
File path: samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
##########
@@ -110,46 +114,50 @@ private void registerSourceSchemas(SchemaPlus rootSchema) {
   }
 
   public RelRoot plan(String query) {
-    try {
-      Connection connection = DriverManager.getConnection("jdbc:calcite:");
-      CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
-      SchemaPlus rootSchema = calciteConnection.getRootSchema();
-      registerSourceSchemas(rootSchema);
-
-      List<SamzaSqlScalarFunctionImpl> samzaSqlFunctions = udfMetadata.stream()
-          .map(x -> new SamzaSqlScalarFunctionImpl(x))
-          .collect(Collectors.toList());
-
-      final List<RelTraitDef> traitDefs = new ArrayList<>();
-
-      traitDefs.add(ConventionTraitDef.INSTANCE);
-      traitDefs.add(RelCollationTraitDef.INSTANCE);
-
-      List<SqlOperatorTable> sqlOperatorTables = new ArrayList<>();
-      sqlOperatorTables.add(new SamzaSqlOperatorTable());
-      sqlOperatorTables.add(new SamzaSqlUdfOperatorTable(samzaSqlFunctions));
-
-      // Using lenient so that !=,%,- are allowed.
-      FrameworkConfig frameworkConfig = Frameworks.newConfigBuilder()
-          .parserConfig(SqlParser.configBuilder()
-              .setLex(Lex.JAVA)
-              .setConformance(SqlConformanceEnum.LENIENT)
-              .setCaseSensitive(false) // Make Udfs case insensitive
-              .build())
-          .defaultSchema(rootSchema)
-          .operatorTable(new ChainedSqlOperatorTable(sqlOperatorTables))
-          .sqlToRelConverterConfig(SqlToRelConverter.Config.DEFAULT)
-          .traitDefs(traitDefs)
-          .context(Contexts.EMPTY_CONTEXT)
-          .costFactory(null)
-          .build();
-      Planner planner = Frameworks.getPlanner(frameworkConfig);
-
+    SchemaPlus rootSchema = CalciteSchema.createRootSchema(true, false).plus();
+    registerSourceSchemas(rootSchema);
+
+    List<SamzaSqlScalarFunctionImpl> samzaSqlFunctions =
+        udfMetadata.stream().map(x -> new SamzaSqlScalarFunctionImpl(x)).collect(Collectors.toList());
+
+    final List<RelTraitDef> traitDefs = new ArrayList<>();
+
+    traitDefs.add(ConventionTraitDef.INSTANCE);
+    traitDefs.add(RelCollationTraitDef.INSTANCE);
+
+    List<SqlOperatorTable> sqlOperatorTables = new ArrayList<>();
+    sqlOperatorTables.add(new SamzaSqlOperatorTable());
+    sqlOperatorTables.add(new SamzaSqlUdfOperatorTable(samzaSqlFunctions));
+
+    // Using lenient so that !=,%,- are allowed.
+    FrameworkConfig frameworkConfig = Frameworks.newConfigBuilder()
+        .parserConfig(SqlParser.configBuilder()
+            .setLex(Lex.JAVA)
+            .setConformance(SqlConformanceEnum.LENIENT)
+            .setCaseSensitive(false) // Make Udfs case insensitive
+            .build())
+        .defaultSchema(rootSchema)
+        .operatorTable(new ChainedSqlOperatorTable(sqlOperatorTables))
+        .sqlToRelConverterConfig(SqlToRelConverter.Config.DEFAULT)
+        .traitDefs(traitDefs)
+        .context(Contexts.EMPTY_CONTEXT)
+        .programs(
+            Programs.hep(ImmutableList.of(FilterJoinRule.FILTER_ON_JOIN), true, DefaultRelMetadataProvider.INSTANCE))
+        .build();
+
+    // Planner is a auto closable
+    try (Planner planner = Frameworks.getPlanner(frameworkConfig)) {
       SqlNode sql = planner.parse(query);
       SqlNode validatedSql = planner.validate(sql);
       RelRoot relRoot = planner.rel(validatedSql);
       LOG.info("query plan:\n" + RelOptUtil.toString(relRoot.rel, SqlExplainLevel.ALL_ATTRIBUTES));
-      return relRoot;
+      RelTraitSet relTraitSet = RelTraitSet.createEmpty();

Review comment:
       Could you pull in the latest master branch code ? I have pushed Query optimization code which conflicts with your changes.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] b-slim commented on a change in pull request #1386: [SAMZA-2557] Adding support for nested rows access via dot path.

Posted by GitBox <gi...@apache.org>.
b-slim commented on a change in pull request #1386:
URL: https://github.com/apache/samza/pull/1386#discussion_r458280724



##########
File path: samza-sql/src/main/java/org/apache/samza/sql/udf/GetNestedField.java
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.udf;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import java.lang.reflect.Type;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.calcite.adapter.enumerable.CallImplementor;
+import org.apache.calcite.adapter.enumerable.EnumUtils;
+import org.apache.calcite.adapter.enumerable.NullPolicy;
+import org.apache.calcite.adapter.enumerable.RexImpTable;
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
+import org.apache.calcite.linq4j.tree.ConstantExpression;
+import org.apache.calcite.linq4j.tree.Expression;
+import org.apache.calcite.linq4j.tree.ExpressionType;
+import org.apache.calcite.linq4j.tree.Expressions;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.schema.Function;
+import org.apache.calcite.schema.FunctionParameter;
+import org.apache.calcite.schema.ImplementableFunction;
+import org.apache.calcite.schema.ScalarFunction;
+import org.apache.calcite.sql.SqlCallBinding;
+import org.apache.calcite.sql.SqlFunction;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperandCountRange;
+import org.apache.calcite.sql.SqlOperatorBinding;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.sql.type.OperandTypes;
+import org.apache.calcite.sql.type.SqlOperandCountRanges;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.sql.validate.SqlUserDefinedFunction;
+
+import static org.apache.calcite.schema.impl.ReflectiveFunctionBase.builder;
+
+
+/**
+ * Operator to extract nested Rows or Fields form a struct row type using a dotted path.
+ * The goal of this operator is two-fold.
+ * First it is a temporary fix for https://issues.apache.org/jira/browse/CALCITE-4065 to extract a row from a row.
+ * Second it will enable smooth backward compatible migration from existing udf that relies on legacy row format.

Review comment:
       Added more tests for this to mimic the unit tests. As you can see this will not include on how legacy handles map. Thus will need for sure to re-write such statement when going with new releases.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] atoomula commented on a change in pull request #1386: [SAMZA-2557] Adding support for nested rows access via dot path.

Posted by GitBox <gi...@apache.org>.
atoomula commented on a change in pull request #1386:
URL: https://github.com/apache/samza/pull/1386#discussion_r457427444



##########
File path: samza-sql/src/main/java/org/apache/samza/sql/udf/GetNestedField.java
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.udf;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import java.lang.reflect.Type;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.calcite.adapter.enumerable.CallImplementor;
+import org.apache.calcite.adapter.enumerable.EnumUtils;
+import org.apache.calcite.adapter.enumerable.NullPolicy;
+import org.apache.calcite.adapter.enumerable.RexImpTable;
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
+import org.apache.calcite.linq4j.tree.ConstantExpression;
+import org.apache.calcite.linq4j.tree.Expression;
+import org.apache.calcite.linq4j.tree.ExpressionType;
+import org.apache.calcite.linq4j.tree.Expressions;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.schema.Function;
+import org.apache.calcite.schema.FunctionParameter;
+import org.apache.calcite.schema.ImplementableFunction;
+import org.apache.calcite.schema.ScalarFunction;
+import org.apache.calcite.sql.SqlCallBinding;
+import org.apache.calcite.sql.SqlFunction;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperandCountRange;
+import org.apache.calcite.sql.SqlOperatorBinding;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.sql.type.OperandTypes;
+import org.apache.calcite.sql.type.SqlOperandCountRanges;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.sql.validate.SqlUserDefinedFunction;
+
+import static org.apache.calcite.schema.impl.ReflectiveFunctionBase.builder;
+
+
+/**
+ * Operator to extract nested Rows or Fields form a struct row type using a dotted path.
+ * The goal of this operator is two-fold.
+ * First it is a temporary fix for https://issues.apache.org/jira/browse/CALCITE-4065 to extract a row from a row.
+ * Second it will enable smooth backward compatible migration from existing udf that relies on legacy row format.

Review comment:
       Is this truly backward compatible with with the existing GetNestedField udf ? Does this support all the types that are tested in GetSqlFieldUdf ? Esp nested map and array. https://github.com/apache/samza/blob/dcd4b558a2c702f5b5a320fdb9d0c3fcadabd09b/samza-sql/src/test/java/org/apache/samza/sql/fn/TestGetSqlFieldUdf.java




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] b-slim commented on a change in pull request #1386: [SAMZA-2557] Adding support for nested rows access via dot path.

Posted by GitBox <gi...@apache.org>.
b-slim commented on a change in pull request #1386:
URL: https://github.com/apache/samza/pull/1386#discussion_r457601740



##########
File path: samza-sql/src/main/java/org/apache/samza/sql/udf/GetNestedField.java
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.udf;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import java.lang.reflect.Type;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.calcite.adapter.enumerable.CallImplementor;
+import org.apache.calcite.adapter.enumerable.EnumUtils;
+import org.apache.calcite.adapter.enumerable.NullPolicy;
+import org.apache.calcite.adapter.enumerable.RexImpTable;
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
+import org.apache.calcite.linq4j.tree.ConstantExpression;
+import org.apache.calcite.linq4j.tree.Expression;
+import org.apache.calcite.linq4j.tree.ExpressionType;
+import org.apache.calcite.linq4j.tree.Expressions;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.schema.Function;
+import org.apache.calcite.schema.FunctionParameter;
+import org.apache.calcite.schema.ImplementableFunction;
+import org.apache.calcite.schema.ScalarFunction;
+import org.apache.calcite.sql.SqlCallBinding;
+import org.apache.calcite.sql.SqlFunction;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperandCountRange;
+import org.apache.calcite.sql.SqlOperatorBinding;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.sql.type.OperandTypes;
+import org.apache.calcite.sql.type.SqlOperandCountRanges;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.sql.validate.SqlUserDefinedFunction;
+
+import static org.apache.calcite.schema.impl.ReflectiveFunctionBase.builder;
+
+
+/**
+ * Operator to extract nested Rows or Fields form a struct row type using a dotted path.
+ * The goal of this operator is two-fold.
+ * First it is a temporary fix for https://issues.apache.org/jira/browse/CALCITE-4065 to extract a row from a row.
+ * Second it will enable smooth backward compatible migration from existing udf that relies on legacy row format.

Review comment:
       It should be I run all the tests and it passed, if you have any test or use case in mind please bring it up. But this should work with all the type and will enforce type checking as oppose to the old udf that is type less in bunch corner cases. In nutshell some failure will happen if type does not match.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] b-slim commented on a change in pull request #1386: [SAMZA-2557] Adding support for nested rows access via dot path.

Posted by GitBox <gi...@apache.org>.
b-slim commented on a change in pull request #1386:
URL: https://github.com/apache/samza/pull/1386#discussion_r449097622



##########
File path: samza-sql/src/main/java/org/apache/samza/sql/translator/MessageStreamCollector.java
##########
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.translator;
+
+import java.io.Closeable;
+import java.io.Serializable;
+import java.time.Duration;
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.function.Function;
+import org.apache.samza.context.Context;
+import org.apache.samza.operators.KV;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.functions.AsyncFlatMapFunction;
+import org.apache.samza.operators.functions.FilterFunction;
+import org.apache.samza.operators.functions.FlatMapFunction;
+import org.apache.samza.operators.functions.JoinFunction;
+import org.apache.samza.operators.functions.MapFunction;
+import org.apache.samza.operators.functions.SinkFunction;
+import org.apache.samza.operators.functions.StreamTableJoinFunction;
+import org.apache.samza.operators.windows.Window;
+import org.apache.samza.operators.windows.WindowPane;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.sql.data.SamzaSqlRelMessage;
+import org.apache.samza.table.Table;
+
+
+/**
+ * Collector of Map and Filter Samza Functions to collect call stack on the top of Remote table.
+ * This Collector will be used by Join operator and trigger it when applying the join function post lookup.
+ *
+ * Note that this is needed because the Remote Table can not expose a proper {@code MessageStream}.
+ * It is a work around to minimize the amount of code changes of the current Query Translator {@link org.apache.samza.sql.translator.QueryTranslator},
+ * But in an ideal world, we should use Calcite planner in conventional way we can combine function when via translation of RelNodes.

Review comment:
       The conventional way of using Calcite is pushing operator toward table scan as much as possible. 
   This PR does that by leveraging Calcite as is with minimal code changes or copy and past of some internal code. 
   By pushing some stuff up case remote table and some stuff down case remote table shows a clear disconnect and not clear design that a someone else beside the author of the work will be able to get it without spending hours stepping into the debugger code. 
   The other question how this will be better than current approach of composing map/filters and fuse it to join operator, don't you think it is more optimal to fuse all the lookup/project/filter within join as one operator ? 
   Adding to that This way also, will enable a case where the join condition will be more than just __key__ = c but will be able to add more conjunctions in the near future.  
   In my opinion this is the most clean way to work around the limitation of the remote table join operator with no major surgery and allowing pushing filters/projects and in the future handle richer join conditions. 
   Again the proper fix will be to adopt Calcite framework Convention pattern where an operator can be pushed inside another operator when going from logical to physical for instance in this case the Join will transformed to a new join node that knows how to translate the project and filters within it self. That is kind of what is happening now but without making this work a major surgery as someone has suggested.
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] atoomula commented on a change in pull request #1386: [SAMZA-2557] Adding support for nested rows access via dot path.

Posted by GitBox <gi...@apache.org>.
atoomula commented on a change in pull request #1386:
URL: https://github.com/apache/samza/pull/1386#discussion_r455117551



##########
File path: samza-sql/src/main/java/org/apache/samza/sql/translator/MessageStreamCollector.java
##########
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.translator;
+
+import java.io.Closeable;
+import java.io.Serializable;
+import java.time.Duration;
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.function.Function;
+import org.apache.samza.context.Context;
+import org.apache.samza.operators.KV;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.functions.AsyncFlatMapFunction;
+import org.apache.samza.operators.functions.FilterFunction;
+import org.apache.samza.operators.functions.FlatMapFunction;
+import org.apache.samza.operators.functions.JoinFunction;
+import org.apache.samza.operators.functions.MapFunction;
+import org.apache.samza.operators.functions.SinkFunction;
+import org.apache.samza.operators.functions.StreamTableJoinFunction;
+import org.apache.samza.operators.windows.Window;
+import org.apache.samza.operators.windows.WindowPane;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.sql.data.SamzaSqlRelMessage;
+import org.apache.samza.table.Table;
+
+
+/**
+ * Collector of Map and Filter Samza Functions to collect call stack on the top of Remote table.
+ * This Collector will be used by Join operator and trigger it when applying the join function post lookup.
+ *
+ * Note that this is needed because the Remote Table can not expose a proper {@code MessageStream}.
+ * It is a work around to minimize the amount of code changes of the current Query Translator {@link org.apache.samza.sql.translator.QueryTranslator},
+ * But in an ideal world, we should use Calcite planner in conventional way we can combine function when via translation of RelNodes.

Review comment:
       Ahh.. now I get it. If we have the following condition "(p.__key__ + 1) = pv.profileId", we cannot really solve it with calcite rule change other than changing our translator code (one of which is what you did).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] atoomula commented on a change in pull request #1386: [WIP] Adding support for nested rows access via dot path.

Posted by GitBox <gi...@apache.org>.
atoomula commented on a change in pull request #1386:
URL: https://github.com/apache/samza/pull/1386#discussion_r442546064



##########
File path: samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
##########
@@ -141,20 +142,28 @@ public RelRoot plan(String query) {
           .sqlToRelConverterConfig(SqlToRelConverter.Config.DEFAULT)
           .traitDefs(traitDefs)
           .context(Contexts.EMPTY_CONTEXT)
-          .costFactory(null)
+          .programs(
+              Programs.hep(ImmutableList.of(FilterJoinRule.FILTER_ON_JOIN), true, DefaultRelMetadataProvider.INSTANCE))

Review comment:
       Could we remove FILTER_ON_JOIN optimization ? It doesn't work well with remote joins. We should instead use the optimization for remote joins in the other PR that I sent.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] atoomula commented on a change in pull request #1386: [SAMZA-2557] Adding support for nested rows access via dot path.

Posted by GitBox <gi...@apache.org>.
atoomula commented on a change in pull request #1386:
URL: https://github.com/apache/samza/pull/1386#discussion_r448989230



##########
File path: samza-sql/src/main/java/org/apache/samza/sql/translator/MessageStreamCollector.java
##########
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.translator;
+
+import java.io.Closeable;
+import java.io.Serializable;
+import java.time.Duration;
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.function.Function;
+import org.apache.samza.context.Context;
+import org.apache.samza.operators.KV;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.functions.AsyncFlatMapFunction;
+import org.apache.samza.operators.functions.FilterFunction;
+import org.apache.samza.operators.functions.FlatMapFunction;
+import org.apache.samza.operators.functions.JoinFunction;
+import org.apache.samza.operators.functions.MapFunction;
+import org.apache.samza.operators.functions.SinkFunction;
+import org.apache.samza.operators.functions.StreamTableJoinFunction;
+import org.apache.samza.operators.windows.Window;
+import org.apache.samza.operators.windows.WindowPane;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.sql.data.SamzaSqlRelMessage;
+import org.apache.samza.table.Table;
+
+
+/**
+ * Collector of Map and Filter Samza Functions to collect call stack on the top of Remote table.
+ * This Collector will be used by Join operator and trigger it when applying the join function post lookup.
+ *
+ * Note that this is needed because the Remote Table can not expose a proper {@code MessageStream}.
+ * It is a work around to minimize the amount of code changes of the current Query Translator {@link org.apache.samza.sql.translator.QueryTranslator},
+ * But in an ideal world, we should use Calcite planner in conventional way we can combine function when via translation of RelNodes.

Review comment:
       I still do not get it. Why can't we use query optimization for remote tables and if we see a filter/projection between table (we can detect table vs stream in the optimizer rule) and join in the Calcite plan, push them up ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] b-slim commented on a change in pull request #1386: [SAMZA-2557] Adding support for nested rows access via dot path.

Posted by GitBox <gi...@apache.org>.
b-slim commented on a change in pull request #1386:
URL: https://github.com/apache/samza/pull/1386#discussion_r457599989



##########
File path: samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
##########
@@ -194,10 +194,12 @@ public static Object convertToAvroObject(Object relObj, Schema schema) {
             .collect(Collectors.toList());
         return avroList;
       case MAP:
-        return ((Map<String, ?>) relObj).entrySet()
-            .stream()
-            .collect(Collectors.toMap(Map.Entry::getKey,
-              e -> convertToAvroObject(e.getValue(), getNonNullUnionSchema(schema).getValueType())));
+        // If you ask why not using String and that is because some strings are Wrapped into org.apache.avro.util.Utf8
+        // TODO looking at the Utf8 code base it is not immutable, having it as a key is calling for trouble!
+        final Map<Object, Object> outputMap = new HashMap<>();

Review comment:
       @atoomula The code as of today does that, in fact the reason I run into it is because I changed to String but tested failed. To avoid making this PR a fix everything pr I added the comment and base fix for the null case needed by my work. But I agree we need to move out of Avro String especially that is is mutable. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] atoomula merged pull request #1386: [SAMZA-2557] Adding support for nested rows access via dot path.

Posted by GitBox <gi...@apache.org>.
atoomula merged pull request #1386:
URL: https://github.com/apache/samza/pull/1386


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] atoomula commented on a change in pull request #1386: [SAMZA-2557] Adding support for nested rows access via dot path.

Posted by GitBox <gi...@apache.org>.
atoomula commented on a change in pull request #1386:
URL: https://github.com/apache/samza/pull/1386#discussion_r457129009



##########
File path: samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
##########
@@ -110,46 +114,50 @@ private void registerSourceSchemas(SchemaPlus rootSchema) {
   }
 
   public RelRoot plan(String query) {
-    try {
-      Connection connection = DriverManager.getConnection("jdbc:calcite:");
-      CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
-      SchemaPlus rootSchema = calciteConnection.getRootSchema();
-      registerSourceSchemas(rootSchema);
-
-      List<SamzaSqlScalarFunctionImpl> samzaSqlFunctions = udfMetadata.stream()
-          .map(x -> new SamzaSqlScalarFunctionImpl(x))
-          .collect(Collectors.toList());
-
-      final List<RelTraitDef> traitDefs = new ArrayList<>();
-
-      traitDefs.add(ConventionTraitDef.INSTANCE);
-      traitDefs.add(RelCollationTraitDef.INSTANCE);
-
-      List<SqlOperatorTable> sqlOperatorTables = new ArrayList<>();
-      sqlOperatorTables.add(new SamzaSqlOperatorTable());
-      sqlOperatorTables.add(new SamzaSqlUdfOperatorTable(samzaSqlFunctions));
-
-      // Using lenient so that !=,%,- are allowed.
-      FrameworkConfig frameworkConfig = Frameworks.newConfigBuilder()
-          .parserConfig(SqlParser.configBuilder()
-              .setLex(Lex.JAVA)
-              .setConformance(SqlConformanceEnum.LENIENT)
-              .setCaseSensitive(false) // Make Udfs case insensitive
-              .build())
-          .defaultSchema(rootSchema)
-          .operatorTable(new ChainedSqlOperatorTable(sqlOperatorTables))
-          .sqlToRelConverterConfig(SqlToRelConverter.Config.DEFAULT)
-          .traitDefs(traitDefs)
-          .context(Contexts.EMPTY_CONTEXT)
-          .costFactory(null)
-          .build();
-      Planner planner = Frameworks.getPlanner(frameworkConfig);
-
+    SchemaPlus rootSchema = CalciteSchema.createRootSchema(true, false).plus();
+    registerSourceSchemas(rootSchema);
+
+    List<SamzaSqlScalarFunctionImpl> samzaSqlFunctions =
+        udfMetadata.stream().map(x -> new SamzaSqlScalarFunctionImpl(x)).collect(Collectors.toList());
+
+    final List<RelTraitDef> traitDefs = new ArrayList<>();
+
+    traitDefs.add(ConventionTraitDef.INSTANCE);
+    traitDefs.add(RelCollationTraitDef.INSTANCE);
+
+    List<SqlOperatorTable> sqlOperatorTables = new ArrayList<>();
+    sqlOperatorTables.add(new SamzaSqlOperatorTable());
+    sqlOperatorTables.add(new SamzaSqlUdfOperatorTable(samzaSqlFunctions));
+
+    // Using lenient so that !=,%,- are allowed.
+    FrameworkConfig frameworkConfig = Frameworks.newConfigBuilder()
+        .parserConfig(SqlParser.configBuilder()
+            .setLex(Lex.JAVA)
+            .setConformance(SqlConformanceEnum.LENIENT)
+            .setCaseSensitive(false) // Make Udfs case insensitive
+            .build())
+        .defaultSchema(rootSchema)
+        .operatorTable(new ChainedSqlOperatorTable(sqlOperatorTables))
+        .sqlToRelConverterConfig(SqlToRelConverter.Config.DEFAULT)
+        .traitDefs(traitDefs)
+        .context(Contexts.EMPTY_CONTEXT)
+        .programs(
+            Programs.hep(ImmutableList.of(FilterJoinRule.FILTER_ON_JOIN), true, DefaultRelMetadataProvider.INSTANCE))
+        .build();
+
+    // Planner is a auto closable
+    try (Planner planner = Frameworks.getPlanner(frameworkConfig)) {
       SqlNode sql = planner.parse(query);
       SqlNode validatedSql = planner.validate(sql);
       RelRoot relRoot = planner.rel(validatedSql);
       LOG.info("query plan:\n" + RelOptUtil.toString(relRoot.rel, SqlExplainLevel.ALL_ATTRIBUTES));
-      return relRoot;
+      RelTraitSet relTraitSet = RelTraitSet.createEmpty();

Review comment:
       Could you merge with the latest code ? I have merged Query optimization code.

##########
File path: samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
##########
@@ -110,46 +114,50 @@ private void registerSourceSchemas(SchemaPlus rootSchema) {
   }
 
   public RelRoot plan(String query) {
-    try {
-      Connection connection = DriverManager.getConnection("jdbc:calcite:");
-      CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
-      SchemaPlus rootSchema = calciteConnection.getRootSchema();
-      registerSourceSchemas(rootSchema);
-
-      List<SamzaSqlScalarFunctionImpl> samzaSqlFunctions = udfMetadata.stream()
-          .map(x -> new SamzaSqlScalarFunctionImpl(x))
-          .collect(Collectors.toList());
-
-      final List<RelTraitDef> traitDefs = new ArrayList<>();
-
-      traitDefs.add(ConventionTraitDef.INSTANCE);
-      traitDefs.add(RelCollationTraitDef.INSTANCE);
-
-      List<SqlOperatorTable> sqlOperatorTables = new ArrayList<>();
-      sqlOperatorTables.add(new SamzaSqlOperatorTable());
-      sqlOperatorTables.add(new SamzaSqlUdfOperatorTable(samzaSqlFunctions));
-
-      // Using lenient so that !=,%,- are allowed.
-      FrameworkConfig frameworkConfig = Frameworks.newConfigBuilder()
-          .parserConfig(SqlParser.configBuilder()
-              .setLex(Lex.JAVA)
-              .setConformance(SqlConformanceEnum.LENIENT)
-              .setCaseSensitive(false) // Make Udfs case insensitive
-              .build())
-          .defaultSchema(rootSchema)
-          .operatorTable(new ChainedSqlOperatorTable(sqlOperatorTables))
-          .sqlToRelConverterConfig(SqlToRelConverter.Config.DEFAULT)
-          .traitDefs(traitDefs)
-          .context(Contexts.EMPTY_CONTEXT)
-          .costFactory(null)
-          .build();
-      Planner planner = Frameworks.getPlanner(frameworkConfig);
-
+    SchemaPlus rootSchema = CalciteSchema.createRootSchema(true, false).plus();

Review comment:
       Just curious, is there any reason to set caching to false ?

##########
File path: samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
##########
@@ -194,10 +194,12 @@ public static Object convertToAvroObject(Object relObj, Schema schema) {
             .collect(Collectors.toList());
         return avroList;
       case MAP:
-        return ((Map<String, ?>) relObj).entrySet()
-            .stream()
-            .collect(Collectors.toMap(Map.Entry::getKey,
-              e -> convertToAvroObject(e.getValue(), getNonNullUnionSchema(schema).getValueType())));
+        // If you ask why not using String and that is because some strings are Wrapped into org.apache.avro.util.Utf8
+        // TODO looking at the Utf8 code base it is not immutable, having it as a key is calling for trouble!
+        final Map<Object, Object> outputMap = new HashMap<>();

Review comment:
       In what scenarios do you expect the map key to be of Utf8 type ? Considering that Avro mandates the map key type to be a string, isn't it fair to expect users to convert Utf8s to string ?

##########
File path: samza-sql/src/main/java/org/apache/samza/sql/udf/GetNestedField.java
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.udf;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import java.lang.reflect.Type;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.calcite.adapter.enumerable.CallImplementor;
+import org.apache.calcite.adapter.enumerable.EnumUtils;
+import org.apache.calcite.adapter.enumerable.NullPolicy;
+import org.apache.calcite.adapter.enumerable.RexImpTable;
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
+import org.apache.calcite.linq4j.tree.ConstantExpression;
+import org.apache.calcite.linq4j.tree.Expression;
+import org.apache.calcite.linq4j.tree.ExpressionType;
+import org.apache.calcite.linq4j.tree.Expressions;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.schema.Function;
+import org.apache.calcite.schema.FunctionParameter;
+import org.apache.calcite.schema.ImplementableFunction;
+import org.apache.calcite.schema.ScalarFunction;
+import org.apache.calcite.sql.SqlCallBinding;
+import org.apache.calcite.sql.SqlFunction;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperandCountRange;
+import org.apache.calcite.sql.SqlOperatorBinding;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.sql.type.OperandTypes;
+import org.apache.calcite.sql.type.SqlOperandCountRanges;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.sql.validate.SqlUserDefinedFunction;
+
+import static org.apache.calcite.schema.impl.ReflectiveFunctionBase.builder;
+
+
+/**
+ * Operator to extract nested Rows or Fields form a struct row type using a dotted path.
+ * The goal of this operator is two-fold.
+ * First it is a temporary fix for https://issues.apache.org/jira/browse/CALCITE-4065 to extract a row from a row.
+ * Second it will enable smooth backward compatible migration from existing udf that relies on legacy row format.

Review comment:
       Is this truly backward compatible with with the existing GetNestedField udf ? Does this support all the types that are tested in GetSqlFieldUdf ? Esp map and array. https://github.com/apache/samza/blob/dcd4b558a2c702f5b5a320fdb9d0c3fcadabd09b/samza-sql/src/test/java/org/apache/samza/sql/fn/TestGetSqlFieldUdf.java




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org