You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/07/25 13:55:39 UTC

[flink] branch master updated (817da8a -> 9cd9593)

This is an automated email from the ASF dual-hosted git repository.

jark pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 817da8a  [FLINK-13353][table-planner] Remove 2 args constructor in REPLACE expression
     new 4817dd9  [FLINK-13116][table-planner-blink] Fix Catalog statistics is not bridged to blink planner
     new 9cd9593  [FLINK-13116][table-planner-blink] Exclude RAT for explain and digest resource files

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/flink/table/plan/stats/TableStats.java  |   1 +
 .../planner/catalog/DatabaseCalciteSchema.java     |  41 ++++---
 .../util/CatalogTableStatisticsConverter.java      | 120 +++++++++++++++++++++
 .../table/planner/plan/stats/FlinkStatistic.scala  |  29 +++--
 .../planner/catalog/CatalogStatisticsTest.java     | 100 +++++++++++++++++
 .../digest/testGetDigestWithDynamicFunction.out    |  17 ---
 .../testGetDigestWithDynamicFunctionView.out       |  17 ---
 .../resources/explain/testGetStatsFromCatalog.out  |  32 ++++++
 .../planner/plan/metadata/MetadataTestUtil.scala   |   2 +-
 .../planner/plan/utils/RelDigestUtilTest.scala     |  31 +-----
 .../flink/table/planner/utils/TableTestBase.scala  |  24 +++--
 pom.xml                                            |   2 +
 12 files changed, 321 insertions(+), 95 deletions(-)
 create mode 100644 flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/util/CatalogTableStatisticsConverter.java
 create mode 100644 flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/catalog/CatalogStatisticsTest.java
 create mode 100644 flink-table/flink-table-planner-blink/src/test/resources/explain/testGetStatsFromCatalog.out


[flink] 02/02: [FLINK-13116][table-planner-blink] Exclude RAT for explain and digest resource files

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9cd95930bc89c2a4d4e8ee5b7e0c7394eca22b73
Author: Jark Wu <im...@gmail.com>
AuthorDate: Thu Jul 25 20:29:16 2019 +0800

    [FLINK-13116][table-planner-blink] Exclude RAT for explain and digest resource files
---
 .../planner/catalog/CatalogStatisticsTest.java     |  2 +-
 .../digest/testGetDigestWithDynamicFunction.out    | 17 -----------------
 .../testGetDigestWithDynamicFunctionView.out       | 17 -----------------
 .../{ => explain}/testGetStatsFromCatalog.out      | 17 -----------------
 .../flink/table/planner/utils/TableTestBase.scala  | 22 +---------------------
 pom.xml                                            |  2 ++
 6 files changed, 4 insertions(+), 73 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/catalog/CatalogStatisticsTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/catalog/CatalogStatisticsTest.java
index 0049e9f..7bd6632 100644
--- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/catalog/CatalogStatisticsTest.java
+++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/catalog/CatalogStatisticsTest.java
@@ -76,7 +76,7 @@ public class CatalogStatisticsTest {
 		Table table = tEnv.sqlQuery("select * from T1, T2 where T1.s3 = T2.s3");
 		String result = tEnv.explain(table);
 		// T1 is broadcast side
-		String expected = TableTestUtil.readFromResource("/testGetStatsFromCatalog.out");
+		String expected = TableTestUtil.readFromResource("/explain/testGetStatsFromCatalog.out");
 		assertEquals(expected, TableTestUtil.replaceStageId(result));
 	}
 
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/digest/testGetDigestWithDynamicFunction.out b/flink-table/flink-table-planner-blink/src/test/resources/digest/testGetDigestWithDynamicFunction.out
index ce5f0cd..ac3b44b 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/digest/testGetDigestWithDynamicFunction.out
+++ b/flink-table/flink-table-planner-blink/src/test/resources/digest/testGetDigestWithDynamicFunction.out
@@ -1,20 +1,3 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
 LogicalIntersect(all=[false],rowType=[RecordType(INTEGER random)])
 LogicalIntersect(all=[false],rowType=[RecordType(INTEGER random)])
 LogicalProject(random=[$0],rowType=[RecordType(INTEGER random)])
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/digest/testGetDigestWithDynamicFunctionView.out b/flink-table/flink-table-planner-blink/src/test/resources/digest/testGetDigestWithDynamicFunctionView.out
index 52e47ee..c965d30 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/digest/testGetDigestWithDynamicFunctionView.out
+++ b/flink-table/flink-table-planner-blink/src/test/resources/digest/testGetDigestWithDynamicFunctionView.out
@@ -1,20 +1,3 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
 LogicalIntersect(all=[false],rowType=[RecordType(INTEGER random)])
 LogicalIntersect(all=[false],rowType=[RecordType(INTEGER random)])
 LogicalProject(random=[$0],rowType=[RecordType(INTEGER random)])
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/testGetStatsFromCatalog.out b/flink-table/flink-table-planner-blink/src/test/resources/explain/testGetStatsFromCatalog.out
similarity index 67%
rename from flink-table/flink-table-planner-blink/src/test/resources/testGetStatsFromCatalog.out
rename to flink-table/flink-table-planner-blink/src/test/resources/explain/testGetStatsFromCatalog.out
index 7798738..bc53a0a 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/testGetStatsFromCatalog.out
+++ b/flink-table/flink-table-planner-blink/src/test/resources/explain/testGetStatsFromCatalog.out
@@ -1,20 +1,3 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
 == Abstract Syntax Tree ==
 LogicalProject(b1=[$0], l2=[$1], s3=[$2], d4=[$3], dd5=[$4], b10=[$5], l20=[$6], s30=[$7], d40=[$8], dd50=[$9])
 +- LogicalFilter(condition=[=($2, $7)])
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
index 50f393f..672a71e 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
@@ -1067,27 +1067,7 @@ object TableTestUtil {
 
   def readFromResource(path: String): String = {
     val inputStream = getClass.getResource(path).getFile
-    val fullContent = Source.fromFile(inputStream).mkString
-    val license =
-      """/*
-        | * Licensed to the Apache Software Foundation (ASF) under one
-        | * or more contributor license agreements.  See the NOTICE file
-        | * distributed with this work for additional information
-        | * regarding copyright ownership.  The ASF licenses this file
-        | * to you under the Apache License, Version 2.0 (the
-        | * "License"); you may not use this file except in compliance
-        | * with the License.  You may obtain a copy of the License at
-        | *
-        | *     http://www.apache.org/licenses/LICENSE-2.0
-        | *
-        | * Unless required by applicable law or agreed to in writing, software
-        | * distributed under the License is distributed on an "AS IS" BASIS,
-        | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-        | * See the License for the specific language governing permissions and
-        | * limitations under the License.
-        | */
-        |""".stripMargin
-    fullContent.replace(license, "")
+    Source.fromFile(inputStream).mkString
   }
 
   /**
diff --git a/pom.xml b/pom.xml
index 3f7cd8b..ced31af 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1334,6 +1334,8 @@ under the License.
 						<exclude>flink-formats/flink-avro/src/test/resources/avro/*.avsc</exclude>
 						<exclude>out/test/flink-avro/avro/user.avsc</exclude>
 						<exclude>flink-table/flink-table-planner/src/test/scala/resources/*.out</exclude>
+						<exclude>flink-table/flink-table-planner-blink/src/test/resources/digest/*.out</exclude>
+						<exclude>flink-table/flink-table-planner-blink/src/test/resources/explain/*.out</exclude>
 						<exclude>flink-yarn/src/test/resources/krb5.keytab</exclude>
 						<exclude>flink-end-to-end-tests/test-scripts/test-data/*</exclude>
 						<exclude>flink-end-to-end-tests/test-scripts/docker-hadoop-secure-cluster/config/keystore.jks</exclude>


[flink] 01/02: [FLINK-13116][table-planner-blink] Fix Catalog statistics is not bridged to blink planner

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4817dd9df16c730e2e90f667f51f74ee40c3dd6c
Author: godfreyhe <go...@163.com>
AuthorDate: Thu Jul 11 18:33:54 2019 +0800

    [FLINK-13116][table-planner-blink] Fix Catalog statistics is not bridged to blink planner
    
    This closes #9083
---
 .../apache/flink/table/plan/stats/TableStats.java  |   1 +
 .../planner/catalog/DatabaseCalciteSchema.java     |  41 ++++---
 .../util/CatalogTableStatisticsConverter.java      | 120 +++++++++++++++++++++
 .../table/planner/plan/stats/FlinkStatistic.scala  |  29 +++--
 .../planner/catalog/CatalogStatisticsTest.java     | 100 +++++++++++++++++
 .../src/test/resources/testGetStatsFromCatalog.out |  49 +++++++++
 .../planner/plan/metadata/MetadataTestUtil.scala   |   2 +-
 .../planner/plan/utils/RelDigestUtilTest.scala     |  31 +-----
 .../flink/table/planner/utils/TableTestBase.scala  |  44 ++++++--
 9 files changed, 356 insertions(+), 61 deletions(-)

diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/plan/stats/TableStats.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/plan/stats/TableStats.java
index 9066ae8..453593a 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/plan/stats/TableStats.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/plan/stats/TableStats.java
@@ -28,6 +28,7 @@ import java.util.Map;
  */
 @PublicEvolving
 public final class TableStats {
+	public static final TableStats UNKNOWN = new TableStats(-1, new HashMap<>());
 
 	/**
 	 * cardinality of table.
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/DatabaseCalciteSchema.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/DatabaseCalciteSchema.java
index d9fdcd4..1bb6644 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/DatabaseCalciteSchema.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/DatabaseCalciteSchema.java
@@ -28,10 +28,13 @@ import org.apache.flink.table.catalog.QueryOperationCatalogView;
 import org.apache.flink.table.catalog.exceptions.CatalogException;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
 import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
 import org.apache.flink.table.factories.TableFactory;
 import org.apache.flink.table.factories.TableFactoryUtil;
 import org.apache.flink.table.factories.TableSourceFactory;
 import org.apache.flink.table.operations.QueryOperation;
+import org.apache.flink.table.plan.stats.TableStats;
 import org.apache.flink.table.planner.operations.DataStreamQueryOperation;
 import org.apache.flink.table.planner.operations.RichTableSourceQueryOperation;
 import org.apache.flink.table.planner.plan.schema.TableSinkTable;
@@ -54,6 +57,7 @@ import java.util.Optional;
 import java.util.Set;
 
 import static java.lang.String.format;
+import static org.apache.flink.table.util.CatalogTableStatisticsConverter.convertToTableStats;
 
 /**
  * A mapping between Flink catalog's database and Calcite's schema.
@@ -97,7 +101,7 @@ class DatabaseCalciteSchema extends FlinkSchema {
 				}
 				return QueryOperationCatalogViewTable.createCalciteTable(view);
 			} else if (table instanceof ConnectorCatalogTable) {
-				return convertConnectorTable((ConnectorCatalogTable<?, ?>) table);
+				return convertConnectorTable((ConnectorCatalogTable<?, ?>) table, tablePath);
 			} else if (table instanceof CatalogTable) {
 				return convertCatalogTable(tablePath, (CatalogTable) table);
 			} else {
@@ -114,25 +118,32 @@ class DatabaseCalciteSchema extends FlinkSchema {
 		}
 	}
 
-	private Table convertConnectorTable(ConnectorCatalogTable<?, ?> table) {
-		Optional<TableSourceTable> tableSourceTable = table.getTableSource()
-			.map(tableSource -> {
-				if (!(tableSource instanceof StreamTableSource ||
+	private Table convertConnectorTable(
+			ConnectorCatalogTable<?, ?> table,
+			ObjectPath tablePath) throws TableNotExistException {
+		if (table.getTableSource().isPresent()) {
+			TableSource<?> tableSource = table.getTableSource().get();
+			if (!(tableSource instanceof StreamTableSource ||
 					tableSource instanceof LookupableTableSource)) {
-					throw new TableException(
+				throw new TableException(
 						"Only StreamTableSource and LookupableTableSource can be used in Blink planner.");
-				}
-				if (!isStreamingMode && tableSource instanceof StreamTableSource &&
+			}
+			if (!isStreamingMode && tableSource instanceof StreamTableSource &&
 					!((StreamTableSource<?>) tableSource).isBounded()) {
-					throw new TableException("Only bounded StreamTableSource can be used in batch mode.");
-				}
-				return new TableSourceTable<>(
+				throw new TableException("Only bounded StreamTableSource can be used in batch mode.");
+			}
+
+			TableStats tableStats = TableStats.UNKNOWN;
+			// TODO supports stats for partitionable table
+			if (!table.isPartitioned()) {
+				CatalogTableStatistics tableStatistics = catalog.getTableStatistics(tablePath);
+				CatalogColumnStatistics columnStatistics = catalog.getTableColumnStatistics(tablePath);
+				tableStats = convertToTableStats(tableStatistics, columnStatistics);
+			}
+			return new TableSourceTable<>(
 					tableSource,
 					isStreamingMode,
-					FlinkStatistic.UNKNOWN());
-			});
-		if (tableSourceTable.isPresent()) {
-			return tableSourceTable.get();
+					FlinkStatistic.builder().tableStats(tableStats).build());
 		} else {
 			Optional<TableSinkTable> tableSinkTable = table.getTableSink()
 				.map(tableSink -> new TableSinkTable<>(
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/util/CatalogTableStatisticsConverter.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/util/CatalogTableStatisticsConverter.java
new file mode 100644
index 0000000..8303a4e
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/util/CatalogTableStatisticsConverter.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.util;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataBase;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataBinary;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataBoolean;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataDate;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataDouble;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataLong;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataString;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.plan.stats.ColumnStats;
+import org.apache.flink.table.plan.stats.TableStats;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Utility class for converting {@link CatalogTableStatistics} to {@link TableStats}.
+ */
+public class CatalogTableStatisticsConverter {
+
+	public static TableStats convertToTableStats(
+			CatalogTableStatistics tableStatistics,
+			CatalogColumnStatistics columnStatistics) {
+		if (tableStatistics == null || tableStatistics.equals(CatalogTableStatistics.UNKNOWN)) {
+			return TableStats.UNKNOWN;
+		}
+
+		long rowCount = tableStatistics.getRowCount();
+		Map<String, ColumnStats> columnStatsMap = null;
+		if (columnStatistics != null && !columnStatistics.equals(CatalogColumnStatistics.UNKNOWN)) {
+			columnStatsMap = convertToColumnStatsMap(columnStatistics.getColumnStatisticsData());
+		}
+		if (columnStatsMap == null) {
+			columnStatsMap = new HashMap<>();
+		}
+		return new TableStats(rowCount, columnStatsMap);
+	}
+
+	private static Map<String, ColumnStats> convertToColumnStatsMap(
+			Map<String, CatalogColumnStatisticsDataBase> columnStatisticsData) {
+		Map<String, ColumnStats> columnStatsMap = new HashMap<>();
+		for (Map.Entry<String, CatalogColumnStatisticsDataBase> entry : columnStatisticsData.entrySet()) {
+			ColumnStats columnStats = convertToColumnStats(entry.getValue());
+			columnStatsMap.put(entry.getKey(), columnStats);
+		}
+		return columnStatsMap;
+	}
+
+	private static ColumnStats convertToColumnStats(
+			CatalogColumnStatisticsDataBase columnStatisticsData) {
+		Long ndv = null;
+		Long nullCount = columnStatisticsData.getNullCount();
+		Double avgLen = null;
+		Integer maxLen = null;
+		Number max = null;
+		Number min = null;
+		if (columnStatisticsData instanceof CatalogColumnStatisticsDataBoolean) {
+			CatalogColumnStatisticsDataBoolean booleanData = (CatalogColumnStatisticsDataBoolean) columnStatisticsData;
+			avgLen = 1.0;
+			maxLen = 1;
+			if ((booleanData.getFalseCount() == 0 && booleanData.getTrueCount() > 0) ||
+					(booleanData.getFalseCount() > 0 && booleanData.getTrueCount() == 0)) {
+				ndv = 1L;
+			} else {
+				ndv = 2L;
+			}
+		} else if (columnStatisticsData instanceof CatalogColumnStatisticsDataLong) {
+			CatalogColumnStatisticsDataLong longData = (CatalogColumnStatisticsDataLong) columnStatisticsData;
+			ndv = longData.getNdv();
+			avgLen = 8.0;
+			maxLen = 8;
+			max = longData.getMax();
+			min = longData.getMin();
+		} else if (columnStatisticsData instanceof CatalogColumnStatisticsDataDouble) {
+			CatalogColumnStatisticsDataDouble doubleData = (CatalogColumnStatisticsDataDouble) columnStatisticsData;
+			ndv = doubleData.getNdv();
+			avgLen = 8.0;
+			maxLen = 8;
+			max = doubleData.getMax();
+			min = doubleData.getMin();
+		} else if (columnStatisticsData instanceof CatalogColumnStatisticsDataString) {
+			CatalogColumnStatisticsDataString strData = (CatalogColumnStatisticsDataString) columnStatisticsData;
+			ndv = strData.getNdv();
+			avgLen = strData.getAvgLength();
+			maxLen = (int) strData.getMaxLength();
+		} else if (columnStatisticsData instanceof CatalogColumnStatisticsDataBinary) {
+			CatalogColumnStatisticsDataBinary binaryData = (CatalogColumnStatisticsDataBinary) columnStatisticsData;
+			avgLen = binaryData.getAvgLength();
+			maxLen = (int) binaryData.getMaxLength();
+		} else if (columnStatisticsData instanceof CatalogColumnStatisticsDataDate) {
+			CatalogColumnStatisticsDataDate dateData = (CatalogColumnStatisticsDataDate) columnStatisticsData;
+			ndv = dateData.getNdv();
+		} else {
+			throw new TableException("Unsupported CatalogColumnStatisticsDataBase: " +
+					columnStatisticsData.getClass().getCanonicalName());
+		}
+		return new ColumnStats(ndv, nullCount, avgLen, maxLen, max, min);
+	}
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/stats/FlinkStatistic.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/stats/FlinkStatistic.scala
index 028fc7d..c69ad05 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/stats/FlinkStatistic.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/stats/FlinkStatistic.scala
@@ -39,6 +39,7 @@ class FlinkStatistic(
     relModifiedMonotonicity: RelModifiedMonotonicity = null)
   extends Statistic {
 
+  require(tableStats != null, "tableStats should not be null")
   require(uniqueKeys == null || !uniqueKeys.exists(keys => keys == null || keys.isEmpty),
     "uniqueKeys contains invalid elements!")
 
@@ -56,7 +57,7 @@ class FlinkStatistic(
     * @return The stats of the specified column.
     */
   def getColumnStats(columnName: String): ColumnStats = {
-    if (tableStats != null && tableStats.getColumnStats != null) {
+    if (tableStats != TableStats.UNKNOWN && tableStats.getColumnStats != null) {
       tableStats.getColumnStats.get(columnName)
     } else {
       null
@@ -80,8 +81,14 @@ class FlinkStatistic(
     * @return The number of rows of the table.
     */
   override def getRowCount: Double = {
-    if (tableStats != null) {
-      tableStats.getRowCount.toDouble
+    if (tableStats != TableStats.UNKNOWN) {
+      val rowCount = tableStats.getRowCount.toDouble
+      // rowCount requires non-negative number
+      if (rowCount >= 0) {
+        rowCount
+      } else {
+        null
+      }
     } else {
       null
     }
@@ -111,7 +118,7 @@ class FlinkStatistic(
 
   override def toString: String = {
     val builder = new StringBuilder
-    if (tableStats != null) {
+    if (tableStats != TableStats.UNKNOWN) {
       builder.append(s"TableStats: " +
         s"{rowCount: ${tableStats.getRowCount}, " +
         s"columnStats: ${tableStats.getColumnStats}}, ")
@@ -137,16 +144,20 @@ class FlinkStatistic(
 object FlinkStatistic {
 
   /** Represents a FlinkStatistic that knows nothing about a table */
-  val UNKNOWN: FlinkStatistic = new FlinkStatistic(null)
+  val UNKNOWN: FlinkStatistic = new FlinkStatistic(TableStats.UNKNOWN)
 
   class Builder {
 
-    private var tableStats: TableStats = _
+    private var tableStats: TableStats = TableStats.UNKNOWN
     private var uniqueKeys: util.Set[_ <: util.Set[String]] = _
     private var relModifiedMonotonicity: RelModifiedMonotonicity = _
 
     def tableStats(tableStats: TableStats): Builder = {
-      this.tableStats = tableStats
+      if (tableStats != null) {
+        this.tableStats = tableStats
+      } else {
+        this.tableStats = TableStats.UNKNOWN
+      }
       this
     }
 
@@ -169,7 +180,9 @@ object FlinkStatistic {
     }
 
     def build(): FlinkStatistic = {
-      if (tableStats == null && uniqueKeys == null && relModifiedMonotonicity == null) {
+      if (tableStats == TableStats.UNKNOWN &&
+        uniqueKeys == null &&
+        relModifiedMonotonicity == null) {
         UNKNOWN
       } else {
         new FlinkStatistic(tableStats, uniqueKeys, relModifiedMonotonicity)
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/catalog/CatalogStatisticsTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/catalog/CatalogStatisticsTest.java
new file mode 100644
index 0000000..0049e9f
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/catalog/CatalogStatisticsTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.catalog;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataBase;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataBoolean;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataDate;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataDouble;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataLong;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataString;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.catalog.stats.Date;
+import org.apache.flink.table.planner.utils.TableTestUtil;
+import org.apache.flink.table.planner.utils.TestTableSource;
+import org.apache.flink.table.types.DataType;
+
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * Test for Catalog Statistics.
+ */
+public class CatalogStatisticsTest {
+
+	private TableSchema tableSchema = TableSchema.builder().fields(
+			new String[] { "b1", "l2", "s3", "d4", "dd5" },
+			new DataType[] { DataTypes.BOOLEAN(), DataTypes.BIGINT(), DataTypes.STRING(), DataTypes.DATE(),
+					DataTypes.DOUBLE() }
+	).build();
+
+	@Test
+	public void testGetStatsFromCatalog() throws Exception {
+		EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
+		TableEnvironment tEnv = TableEnvironment.create(settings);
+		tEnv.registerTableSource("T1", new TestTableSource(true, tableSchema));
+		tEnv.registerTableSource("T2", new TestTableSource(true, tableSchema));
+		Catalog catalog = tEnv.getCatalog(tEnv.getCurrentCatalog()).orElse(null);
+		assertNotNull(catalog);
+
+		catalog.alterTableStatistics(ObjectPath.fromString("default_database.T1"),
+				new CatalogTableStatistics(100, 10, 1000L, 2000L), true);
+		catalog.alterTableStatistics(ObjectPath.fromString("default_database.T2"),
+				new CatalogTableStatistics(100000000, 1000, 1000000000L, 2000000000L), true);
+		catalog.alterTableColumnStatistics(ObjectPath.fromString("default_database.T1"), createColumnStats(), true);
+		catalog.alterTableColumnStatistics(ObjectPath.fromString("default_database.T2"), createColumnStats(), true);
+
+		Table table = tEnv.sqlQuery("select * from T1, T2 where T1.s3 = T2.s3");
+		String result = tEnv.explain(table);
+		// T1 is broadcast side
+		String expected = TableTestUtil.readFromResource("/testGetStatsFromCatalog.out");
+		assertEquals(expected, TableTestUtil.replaceStageId(result));
+	}
+
+	private CatalogColumnStatistics createColumnStats() {
+		CatalogColumnStatisticsDataBoolean booleanColStats = new CatalogColumnStatisticsDataBoolean(55L, 45L, 5L);
+		CatalogColumnStatisticsDataLong longColStats = new CatalogColumnStatisticsDataLong(-123L, 763322L, 23L, 79L);
+		CatalogColumnStatisticsDataString stringColStats = new CatalogColumnStatisticsDataString(152L, 43.5D, 20L, 0L);
+		CatalogColumnStatisticsDataDate dateColStats =
+				new CatalogColumnStatisticsDataDate(new Date(71L), new Date(17923L), 1321, 0L);
+		CatalogColumnStatisticsDataDouble doubleColStats =
+				new CatalogColumnStatisticsDataDouble(-123.35D, 7633.22D, 23L, 79L);
+		Map<String, CatalogColumnStatisticsDataBase> colStatsMap = new HashMap<>(6);
+		colStatsMap.put("b1", booleanColStats);
+		colStatsMap.put("l2", longColStats);
+		colStatsMap.put("s3", stringColStats);
+		colStatsMap.put("d4", dateColStats);
+		colStatsMap.put("dd5", doubleColStats);
+		return new CatalogColumnStatistics(colStatsMap);
+	}
+
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/testGetStatsFromCatalog.out b/flink-table/flink-table-planner-blink/src/test/resources/testGetStatsFromCatalog.out
new file mode 100644
index 0000000..7798738
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/resources/testGetStatsFromCatalog.out
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+== Abstract Syntax Tree ==
+LogicalProject(b1=[$0], l2=[$1], s3=[$2], d4=[$3], dd5=[$4], b10=[$5], l20=[$6], s30=[$7], d40=[$8], dd50=[$9])
++- LogicalFilter(condition=[=($2, $7)])
+   +- LogicalJoin(condition=[true], joinType=[inner])
+      :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(b1, l2, s3, d4, dd5)]]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(b1, l2, s3, d4, dd5)]]])
+
+== Optimized Logical Plan ==
+HashJoin(joinType=[InnerJoin], where=[=(s3, s30)], select=[b1, l2, s3, d4, dd5, b10, l20, s30, d40, dd50], isBroadcast=[true], build=[left])
+:- Exchange(distribution=[broadcast])
+:  +- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(b1, l2, s3, d4, dd5)]]], fields=[b1, l2, s3, d4, dd5])
++- TableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(b1, l2, s3, d4, dd5)]]], fields=[b1, l2, s3, d4, dd5])
+
+== Physical Execution Plan ==
+ : Data Source
+	content : collect elements with CollectionInputFormat
+
+ : Data Source
+	content : collect elements with CollectionInputFormat
+
+	 : Operator
+		content : SourceConversion(table:Buffer(default_catalog, default_database, T1, source: [TestTableSource(b1, l2, s3, d4, dd5)]), fields:(b1, l2, s3, d4, dd5))
+		ship_strategy : FORWARD
+
+		 : Operator
+			content : SourceConversion(table:Buffer(default_catalog, default_database, T2, source: [TestTableSource(b1, l2, s3, d4, dd5)]), fields:(b1, l2, s3, d4, dd5))
+			ship_strategy : FORWARD
+
+			 : Operator
+				content : HashJoin(where: (s3 = s30), buildLeft)
+				ship_strategy : BROADCAST
+
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/MetadataTestUtil.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/MetadataTestUtil.scala
index bce8d00..bb33828 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/MetadataTestUtil.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/MetadataTestUtil.scala
@@ -90,7 +90,7 @@ object MetadataTestUtil {
         BasicTypeInfo.DOUBLE_TYPE_INFO,
         BasicTypeInfo.INT_TYPE_INFO))
 
-    getDataStreamTable(schema, new FlinkStatistic(null))
+    getDataStreamTable(schema, new FlinkStatistic(TableStats.UNKNOWN))
   }
 
   private def createMyTable1(): DataStreamTable[BaseRow] = {
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/utils/RelDigestUtilTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/utils/RelDigestUtilTest.scala
index f3a720d..9e003ed 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/utils/RelDigestUtilTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/utils/RelDigestUtilTest.scala
@@ -29,7 +29,6 @@ import org.junit.Assert.assertEquals
 import org.junit.{Before, Test}
 
 import scala.collection.Seq
-import scala.io.Source
 
 class RelDigestUtilTest {
 
@@ -59,7 +58,7 @@ class RelDigestUtilTest {
         |(SELECT id AS random FROM MyTable ORDER BY rand() LIMIT 1)
       """.stripMargin)
     val rel = TableTestUtil.toRelNode(table)
-    val expected = readFromResource("testGetDigestWithDynamicFunction.out")
+    val expected = TableTestUtil.readFromResource("/digest/testGetDigestWithDynamicFunction.out")
     assertEquals(expected, RelDigestUtil.getDigest(rel))
   }
 
@@ -76,33 +75,9 @@ class RelDigestUtilTest {
         |(SELECT * FROM MyView)
       """.stripMargin)
     val rel = TableTestUtil.toRelNode(table).accept(new ExpandTableScanShuttle())
-    val expected = readFromResource("testGetDigestWithDynamicFunctionView.out")
+    val expected = TableTestUtil.readFromResource(
+      "/digest/testGetDigestWithDynamicFunctionView.out")
     assertEquals(expected, RelDigestUtil.getDigest(rel))
   }
 
-  private def readFromResource(name: String): String = {
-    val inputStream = getClass.getResource("/digest/" + name).getFile
-    val fullContent = Source.fromFile(inputStream).mkString
-    val license =
-      """/*
-        | * Licensed to the Apache Software Foundation (ASF) under one
-        | * or more contributor license agreements.  See the NOTICE file
-        | * distributed with this work for additional information
-        | * regarding copyright ownership.  The ASF licenses this file
-        | * to you under the Apache License, Version 2.0 (the
-        | * "License"); you may not use this file except in compliance
-        | * with the License.  You may obtain a copy of the License at
-        | *
-        | *     http://www.apache.org/licenses/LICENSE-2.0
-        | *
-        | * Unless required by applicable law or agreed to in writing, software
-        | * distributed under the License is distributed on an "AS IS" BASIS,
-        | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-        | * See the License for the specific language governing permissions and
-        | * limitations under the License.
-        | */
-        |""".stripMargin
-    fullContent.replace(license, "")
-  }
-
 }
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
index a5b9b17..50f393f 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
@@ -68,6 +68,7 @@ import org.junit.rules.{ExpectedException, TestName}
 import _root_.java.util
 
 import _root_.scala.collection.JavaConversions._
+import _root_.scala.io.Source
 
 /**
   * Test base for testing Table API / SQL plans.
@@ -408,7 +409,7 @@ abstract class TableTestUtilBase(test: TableTestBase, isStreamingMode: Boolean)
     } else {
       explainResult
     }
-    assertEqualsOrExpand("explain", replaceStageId(actual), expand = false)
+    assertEqualsOrExpand("explain", TableTestUtil.replaceStageId(actual), expand = false)
   }
 
   protected def getOptimizedPlan(
@@ -442,14 +443,6 @@ abstract class TableTestUtilBase(test: TableTestBase, isStreamingMode: Boolean)
   }
 
   /**
-    * Stage {id} is ignored, because id keeps incrementing in test class
-    * while StreamExecutionEnvironment is up
-    */
-  protected def replaceStageId(s: String): String = {
-    s.replaceAll("\\r\\n", "\n").replaceAll("Stage \\d+", "")
-  }
-
-  /**
     * ignore estimated cost, because it may be unstable.
     */
   protected def replaceEstimatedCost(s: String): String = {
@@ -1071,4 +1064,37 @@ object TableTestUtil {
     createTableMethod.setAccessible(true)
     createTableMethod.invoke(tEnv, queryOperation).asInstanceOf[Table]
   }
+
+  def readFromResource(path: String): String = {
+    val inputStream = getClass.getResource(path).getFile
+    val fullContent = Source.fromFile(inputStream).mkString
+    val license =
+      """/*
+        | * Licensed to the Apache Software Foundation (ASF) under one
+        | * or more contributor license agreements.  See the NOTICE file
+        | * distributed with this work for additional information
+        | * regarding copyright ownership.  The ASF licenses this file
+        | * to you under the Apache License, Version 2.0 (the
+        | * "License"); you may not use this file except in compliance
+        | * with the License.  You may obtain a copy of the License at
+        | *
+        | *     http://www.apache.org/licenses/LICENSE-2.0
+        | *
+        | * Unless required by applicable law or agreed to in writing, software
+        | * distributed under the License is distributed on an "AS IS" BASIS,
+        | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+        | * See the License for the specific language governing permissions and
+        | * limitations under the License.
+        | */
+        |""".stripMargin
+    fullContent.replace(license, "")
+  }
+
+  /**
+    * Stage {id} is ignored, because id keeps incrementing in test class
+    * while StreamExecutionEnvironment is up
+    */
+  def replaceStageId(s: String): String = {
+    s.replaceAll("\\r\\n", "\n").replaceAll("Stage \\d+", "")
+  }
 }