You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2018/09/21 11:43:36 UTC
[flink] 03/11: [hotfix][table] Extract computeCost in
FlinkLogicalJoin to base class
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit b1ae9c3228e80c415b30396a7bea01360b54c549
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Thu Jul 5 19:58:05 2018 +0200
[hotfix][table] Extract computeCost in FlinkLogicalJoin to base class
This commit can be squashed with a following commit after code review
---
.../plan/nodes/logical/FlinkLogicalJoin.scala | 26 +++-------
.../plan/nodes/logical/FlinkLogicalJoinBase.scala | 59 ++++++++++++++++++++++
2 files changed, 66 insertions(+), 19 deletions(-)
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalJoin.scala
index 869ab31..a5ffc90 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalJoin.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalJoin.scala
@@ -24,12 +24,9 @@ import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
import org.apache.calcite.rel.core._
import org.apache.calcite.rel.logical.LogicalJoin
-import org.apache.calcite.rel.metadata.RelMetadataQuery
import org.apache.calcite.rex.RexNode
import org.apache.flink.table.plan.nodes.FlinkConventions
-import scala.collection.JavaConverters._
-
class FlinkLogicalJoin(
cluster: RelOptCluster,
traitSet: RelTraitSet,
@@ -37,8 +34,13 @@ class FlinkLogicalJoin(
right: RelNode,
condition: RexNode,
joinType: JoinRelType)
- extends Join(cluster, traitSet, left, right, condition, Set.empty[CorrelationId].asJava, joinType)
- with FlinkLogicalRel {
+ extends FlinkLogicalJoinBase(
+ cluster,
+ traitSet,
+ left,
+ right,
+ condition,
+ joinType) {
override def copy(
traitSet: RelTraitSet,
@@ -50,20 +52,6 @@ class FlinkLogicalJoin(
new FlinkLogicalJoin(cluster, traitSet, left, right, conditionExpr, joinType)
}
-
- override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
- val leftRowCnt = metadata.getRowCount(getLeft)
- val leftRowSize = estimateRowSize(getLeft.getRowType)
-
- val rightRowCnt = metadata.getRowCount(getRight)
- val rightRowSize = estimateRowSize(getRight.getRowType)
-
- val ioCost = (leftRowCnt * leftRowSize) + (rightRowCnt * rightRowSize)
- val cpuCost = leftRowCnt + rightRowCnt
- val rowCnt = leftRowCnt + rightRowCnt
-
- planner.getCostFactory.makeCost(rowCnt, cpuCost, ioCost)
- }
}
private class FlinkLogicalJoinConverter
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalJoinBase.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalJoinBase.scala
new file mode 100644
index 0000000..7b5c266
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalJoinBase.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.logical
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.core._
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rex.RexNode
+
+import scala.collection.JavaConverters._
+
+abstract class FlinkLogicalJoinBase(
+ cluster: RelOptCluster,
+ traitSet: RelTraitSet,
+ left: RelNode,
+ right: RelNode,
+ condition: RexNode,
+ joinType: JoinRelType)
+ extends Join(
+ cluster,
+ traitSet,
+ left,
+ right,
+ condition,
+ Set.empty[CorrelationId].asJava,
+ joinType)
+ with FlinkLogicalRel {
+
+ override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
+ val leftRowCnt = metadata.getRowCount(getLeft)
+ val leftRowSize = estimateRowSize(getLeft.getRowType)
+
+ val rightRowCnt = metadata.getRowCount(getRight)
+ val rightRowSize = estimateRowSize(getRight.getRowType)
+
+ val ioCost = (leftRowCnt * leftRowSize) + (rightRowCnt * rightRowSize)
+ val cpuCost = leftRowCnt + rightRowCnt
+ val rowCnt = leftRowCnt + rightRowCnt
+
+ planner.getCostFactory.makeCost(rowCnt, cpuCost, ioCost)
+ }
+}