You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2018/09/21 11:43:36 UTC

[flink] 03/11: [hotfix][table] Extract computeCost in FlinkLogicalJoin to base class

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b1ae9c3228e80c415b30396a7bea01360b54c549
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Thu Jul 5 19:58:05 2018 +0200

    [hotfix][table] Extract computeCost in FlinkLogicalJoin to base class
    
    This commit can be squashed with a following commit after code review
---
 .../plan/nodes/logical/FlinkLogicalJoin.scala      | 26 +++-------
 .../plan/nodes/logical/FlinkLogicalJoinBase.scala  | 59 ++++++++++++++++++++++
 2 files changed, 66 insertions(+), 19 deletions(-)

diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalJoin.scala
index 869ab31..a5ffc90 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalJoin.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalJoin.scala
@@ -24,12 +24,9 @@ import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
 import org.apache.calcite.rel.core._
 import org.apache.calcite.rel.logical.LogicalJoin
-import org.apache.calcite.rel.metadata.RelMetadataQuery
 import org.apache.calcite.rex.RexNode
 import org.apache.flink.table.plan.nodes.FlinkConventions
 
-import scala.collection.JavaConverters._
-
 class FlinkLogicalJoin(
     cluster: RelOptCluster,
     traitSet: RelTraitSet,
@@ -37,8 +34,13 @@ class FlinkLogicalJoin(
     right: RelNode,
     condition: RexNode,
     joinType: JoinRelType)
-  extends Join(cluster, traitSet, left, right, condition, Set.empty[CorrelationId].asJava, joinType)
-  with FlinkLogicalRel {
+  extends FlinkLogicalJoinBase(
+    cluster,
+    traitSet,
+    left,
+    right,
+    condition,
+    joinType) {
 
   override def copy(
       traitSet: RelTraitSet,
@@ -50,20 +52,6 @@ class FlinkLogicalJoin(
 
     new FlinkLogicalJoin(cluster, traitSet, left, right, conditionExpr, joinType)
   }
-
-  override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
-    val leftRowCnt = metadata.getRowCount(getLeft)
-    val leftRowSize = estimateRowSize(getLeft.getRowType)
-
-    val rightRowCnt = metadata.getRowCount(getRight)
-    val rightRowSize = estimateRowSize(getRight.getRowType)
-
-    val ioCost = (leftRowCnt * leftRowSize) + (rightRowCnt * rightRowSize)
-    val cpuCost = leftRowCnt + rightRowCnt
-    val rowCnt = leftRowCnt + rightRowCnt
-
-    planner.getCostFactory.makeCost(rowCnt, cpuCost, ioCost)
-  }
 }
 
 private class FlinkLogicalJoinConverter
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalJoinBase.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalJoinBase.scala
new file mode 100644
index 0000000..7b5c266
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalJoinBase.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.logical
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.core._
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rex.RexNode
+
+import scala.collection.JavaConverters._
+
+abstract class FlinkLogicalJoinBase(
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    left: RelNode,
+    right: RelNode,
+    condition: RexNode,
+    joinType: JoinRelType)
+  extends Join(
+    cluster,
+    traitSet,
+    left,
+    right,
+    condition,
+    Set.empty[CorrelationId].asJava,
+    joinType)
+  with FlinkLogicalRel {
+
+  override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
+    val leftRowCnt = metadata.getRowCount(getLeft)
+    val leftRowSize = estimateRowSize(getLeft.getRowType)
+
+    val rightRowCnt = metadata.getRowCount(getRight)
+    val rightRowSize = estimateRowSize(getRight.getRowType)
+
+    val ioCost = (leftRowCnt * leftRowSize) + (rightRowCnt * rightRowSize)
+    val cpuCost = leftRowCnt + rightRowCnt
+    val rowCnt = leftRowCnt + rightRowCnt
+
+    planner.getCostFactory.makeCost(rowCnt, cpuCost, ioCost)
+  }
+}