You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/05/09 09:40:18 UTC

[flink] branch master updated: [FLINK-12392][table-planner-blink] Port FlinkRelMetadataQuery into Java to avoid compiling error with Scala 2.12

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new ca8d9ca  [FLINK-12392][table-planner-blink] Port FlinkRelMetadataQuery into Java to avoid compiling error with Scala 2.12
ca8d9ca is described below

commit ca8d9cab87fa082a0939ce51b8369b75691df3a4
Author: godfreyhe <go...@163.com>
AuthorDate: Thu May 9 11:43:36 2019 +0800

    [FLINK-12392][table-planner-blink] Port FlinkRelMetadataQuery into Java to avoid compiling error with Scala 2.12
    
    This closes #8376
---
 .../table/plan/metadata/FlinkRelMetadataQuery.java | 250 +++++++++++++++++++++
 .../plan/metadata/FlinkRelMetadataQuery.scala      | 220 ------------------
 .../flink/table/runtime/utils/StreamTestSink.scala |   8 +-
 3 files changed, 254 insertions(+), 224 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/metadata/FlinkRelMetadataQuery.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/metadata/FlinkRelMetadataQuery.java
new file mode 100644
index 0000000..1ede114
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/metadata/FlinkRelMetadataQuery.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.metadata;
+
+import org.apache.flink.table.plan.stats.ValueInterval;
+import org.apache.flink.table.plan.trait.FlinkRelDistribution;
+import org.apache.flink.table.plan.trait.RelModifiedMonotonicity;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.metadata.JaninoRelMetadataProvider;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.util.ImmutableBitSet;
+
+/**
+ * A RelMetadataQuery that defines extended metadata handler in Flink,
+ * e.g ColumnInterval, ColumnNullCount.
+ */
+public class FlinkRelMetadataQuery extends RelMetadataQuery {
+
+	protected static final FlinkRelMetadataQuery PROTOTYPE = new FlinkRelMetadataQuery(false);
+
+	private FlinkMetadata.ColumnInterval.Handler columnIntervalHandler;
+	private FlinkMetadata.FilteredColumnInterval.Handler filteredColumnInterval;
+	private FlinkMetadata.ColumnNullCount.Handler columnNullCountHandler;
+	private FlinkMetadata.ColumnOriginNullCount.Handler columnOriginNullCountHandler;
+	private FlinkMetadata.UniqueGroups.Handler uniqueGroupsHandler;
+	private FlinkMetadata.FlinkDistribution.Handler distributionHandler;
+	private FlinkMetadata.ModifiedMonotonicity.Handler modifiedMonotonicityHandler;
+
+	/**
+	 * Returns an instance of FlinkRelMetadataQuery. It ensures that cycles do not
+	 * occur while computing metadata.
+	 */
+	public static FlinkRelMetadataQuery instance() {
+		return new FlinkRelMetadataQuery();
+	}
+
+	/**
+	 * Reuse input metadataQuery instance if it could cast to FlinkRelMetadataQuery class,
+	 * or create one if not.
+	 *
+	 * @param mq metadataQuery which try to reuse
+	 * @return a FlinkRelMetadataQuery instance
+	 */
+	public static FlinkRelMetadataQuery reuseOrCreate(RelMetadataQuery mq) {
+		if (mq instanceof FlinkRelMetadataQuery) {
+			return (FlinkRelMetadataQuery) mq;
+		} else {
+			return instance();
+		}
+	}
+
+	private FlinkRelMetadataQuery(
+			JaninoRelMetadataProvider metadataProvider,
+			RelMetadataQuery prototype) {
+		super(metadataProvider, prototype);
+	}
+
+	private FlinkRelMetadataQuery() {
+		super(RelMetadataQuery.THREAD_PROVIDERS.get(), RelMetadataQuery.EMPTY);
+		this.columnIntervalHandler = PROTOTYPE.columnIntervalHandler;
+		this.filteredColumnInterval = PROTOTYPE.filteredColumnInterval;
+		this.columnNullCountHandler = PROTOTYPE.columnNullCountHandler;
+		this.columnOriginNullCountHandler = PROTOTYPE.columnOriginNullCountHandler;
+		this.uniqueGroupsHandler = PROTOTYPE.uniqueGroupsHandler;
+		this.distributionHandler = PROTOTYPE.distributionHandler;
+		this.modifiedMonotonicityHandler = PROTOTYPE.modifiedMonotonicityHandler;
+	}
+
+	/**
+	 * Creates and initializes the instance that will serve as a prototype for
+	 * all other instances.
+	 */
+	private FlinkRelMetadataQuery(boolean dummy) {
+		super(RelMetadataQuery.THREAD_PROVIDERS.get(), RelMetadataQuery.EMPTY);
+		this.columnIntervalHandler =
+				RelMetadataQuery.initialHandler(FlinkMetadata.ColumnInterval.Handler.class);
+		this.filteredColumnInterval =
+				RelMetadataQuery.initialHandler(FlinkMetadata.FilteredColumnInterval.Handler.class);
+		this.columnNullCountHandler =
+				RelMetadataQuery.initialHandler(FlinkMetadata.ColumnNullCount.Handler.class);
+		this.columnOriginNullCountHandler =
+				RelMetadataQuery.initialHandler(FlinkMetadata.ColumnOriginNullCount.Handler.class);
+		this.uniqueGroupsHandler =
+				RelMetadataQuery.initialHandler(FlinkMetadata.UniqueGroups.Handler.class);
+		this.distributionHandler =
+				RelMetadataQuery.initialHandler(FlinkMetadata.FlinkDistribution.Handler.class);
+		this.modifiedMonotonicityHandler =
+				RelMetadataQuery.initialHandler(FlinkMetadata.ModifiedMonotonicity.Handler.class);
+	}
+
+	/**
+	 * Returns the {@link FlinkMetadata.ColumnInterval} statistic.
+	 *
+	 * @param rel the relational expression
+	 * @param index the index of the given column
+	 * @return the interval of the given column of a specified relational expression.
+	 *         Returns null if interval cannot be estimated,
+	 *         Returns {@link org.apache.flink.table.plan.stats.EmptyValueInterval}
+	 *         if column values does not contains any value except for null.
+	 */
+	public ValueInterval getColumnInterval(RelNode rel, int index) {
+		for (; ; ) {
+			try {
+				return columnIntervalHandler.getColumnInterval(rel, this, index);
+			} catch (JaninoRelMetadataProvider.NoHandler e) {
+				columnIntervalHandler = revise(e.relClass, FlinkMetadata.ColumnInterval.DEF);
+			}
+		}
+	}
+
+	/**
+	 * Returns the {@link FlinkMetadata.ColumnInterval} of the given column
+	 * under the given filter argument.
+	 *
+	 * @param rel the relational expression
+	 * @param columnIndex the index of the given column
+	 * @param filterArg the index of the filter argument
+	 * @return the interval of the given column of a specified relational expression.
+	 *         Returns null if interval cannot be estimated,
+	 *         Returns {@link org.apache.flink.table.plan.stats.EmptyValueInterval}
+	 *         if column values does not contains any value except for null.
+	 */
+	public ValueInterval getFilteredColumnInterval(RelNode rel, int columnIndex, int filterArg) {
+		for (; ; ) {
+			try {
+				return filteredColumnInterval.getFilteredColumnInterval(
+						rel, this, columnIndex, filterArg);
+			} catch (JaninoRelMetadataProvider.NoHandler e) {
+				filteredColumnInterval =
+						revise(e.relClass, FlinkMetadata.FilteredColumnInterval.DEF);
+			}
+		}
+	}
+
+	/**
+	 * Returns the null count of the given column.
+	 *
+	 * @param rel the relational expression
+	 * @param index the index of the given column
+	 * @return the null count of the given column if can be estimated, else return null.
+	 */
+	public Double getColumnNullCount(RelNode rel, int index) {
+		for (; ; ) {
+			try {
+				return columnNullCountHandler.getColumnNullCount(rel, this, index);
+			} catch (JaninoRelMetadataProvider.NoHandler e) {
+				columnNullCountHandler = revise(e.relClass, FlinkMetadata.ColumnNullCount.DEF);
+			}
+		}
+	}
+
+	/**
+	 * Returns origin null count of the given column.
+	 *
+	 * @param rel the relational expression
+	 * @param index the index of the given column
+	 * @return the null count of the given column if can be estimated, else return null.
+	 */
+	public Double getColumnOriginNullCount(RelNode rel, int index) {
+		for (; ; ) {
+			try {
+				return columnOriginNullCountHandler.getColumnOriginNullCount(rel, this, index);
+			} catch (JaninoRelMetadataProvider.NoHandler e) {
+				columnOriginNullCountHandler =
+						revise(e.relClass, FlinkMetadata.ColumnOriginNullCount.DEF);
+			}
+		}
+	}
+
+	/**
+	 * Returns the (minimum) unique groups of the given columns.
+	 *
+	 * @param rel the relational expression
+	 * @param columns the given columns in a specified relational expression.
+	 *        The given columns should not be null.
+	 * @return the (minimum) unique columns which should be a sub-collection of the given columns,
+	 *         and should not be null or empty. If none unique columns can be found, return the
+	 *         given columns.
+	 */
+	public ImmutableBitSet getUniqueGroups(RelNode rel, ImmutableBitSet columns) {
+		for (; ; ) {
+			try {
+				Preconditions.checkArgument(columns != null);
+				if (columns.isEmpty()) {
+					return columns;
+				}
+				ImmutableBitSet uniqueGroups =
+						uniqueGroupsHandler.getUniqueGroups(rel, this, columns);
+				Preconditions.checkArgument(uniqueGroups != null && !uniqueGroups.isEmpty());
+				Preconditions.checkArgument(columns.contains(uniqueGroups));
+				return uniqueGroups;
+			} catch (JaninoRelMetadataProvider.NoHandler e) {
+				uniqueGroupsHandler = revise(e.relClass, FlinkMetadata.UniqueGroups.DEF);
+			}
+		}
+	}
+
+	/**
+	 * Returns the {@link FlinkRelDistribution} statistic.
+	 *
+	 * @param rel the relational expression
+	 * @return description of how the rows in the relational expression are
+	 *         physically distributed
+	 */
+	public FlinkRelDistribution flinkDistribution(RelNode rel) {
+		for (; ; ) {
+			try {
+				return distributionHandler.flinkDistribution(rel, this);
+			} catch (JaninoRelMetadataProvider.NoHandler e) {
+				distributionHandler = revise(e.relClass, FlinkMetadata.FlinkDistribution.DEF);
+			}
+		}
+	}
+
+	/**
+	 * Returns the {@link RelModifiedMonotonicity} statistic.
+	 *
+	 * @param rel the relational expression
+	 * @return the monotonicity for the corresponding RelNode
+	 */
+	public RelModifiedMonotonicity getRelModifiedMonotonicity(RelNode rel) {
+		for (; ; ) {
+			try {
+				return modifiedMonotonicityHandler.getRelModifiedMonotonicity(rel, this);
+			} catch (JaninoRelMetadataProvider.NoHandler e) {
+				modifiedMonotonicityHandler =
+						revise(e.relClass, FlinkMetadata.ModifiedMonotonicity.DEF);
+			}
+		}
+	}
+
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMetadataQuery.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMetadataQuery.scala
deleted file mode 100644
index 260792f..0000000
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMetadataQuery.scala
+++ /dev/null
@@ -1,220 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.plan.metadata
-
-import org.apache.flink.table.JDouble
-import org.apache.flink.table.plan.`trait`.{FlinkRelDistribution, RelModifiedMonotonicity}
-import org.apache.flink.table.plan.metadata.FlinkMetadata._
-import org.apache.flink.table.plan.stats.ValueInterval
-
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.metadata.{JaninoRelMetadataProvider, RelMetadataQuery}
-import org.apache.calcite.util.ImmutableBitSet
-
-/**
-  * RelMetadataQuery provides a strongly-typed facade on top of
-  * [[org.apache.calcite.rel.metadata.RelMetadataProvider]]
-  * for the set of relational expression metadata queries defined as standard within Calcite.
-  * FlinkRelMetadataQuery class is to add flink specified metadata queries.
-  *
-  * @param metadataProvider provider which provides metadata
-  * @param prototype        the prototype which provides metadata handlers
-  */
-class FlinkRelMetadataQuery private(
-    metadataProvider: JaninoRelMetadataProvider,
-    prototype: RelMetadataQuery) extends RelMetadataQuery(metadataProvider, prototype) {
-
-  private[this] var columnIntervalHandler: ColumnInterval.Handler = _
-  private[this] var filteredColumnInterval: FilteredColumnInterval.Handler = _
-  private[this] var columnNullCountHandler: ColumnNullCount.Handler = _
-  private[this] var columnOriginNullCountHandler: ColumnOriginNullCount.Handler = _
-  private[this] var uniqueGroupsHandler: UniqueGroups.Handler = _
-  private[this] var distributionHandler: FlinkDistribution.Handler = _
-  private[this] var modifiedMonotonicityHandler: ModifiedMonotonicity.Handler = _
-
-  private def this() {
-    this(RelMetadataQuery.THREAD_PROVIDERS.get, RelMetadataQuery.EMPTY)
-    this.columnIntervalHandler = RelMetadataQuery.initialHandler(classOf[ColumnInterval.Handler])
-    this.filteredColumnInterval =
-      RelMetadataQuery.initialHandler(classOf[FilteredColumnInterval.Handler])
-    this.columnNullCountHandler = RelMetadataQuery.initialHandler(classOf[ColumnNullCount.Handler])
-    this.columnOriginNullCountHandler =
-      RelMetadataQuery.initialHandler(classOf[ColumnOriginNullCount.Handler])
-    this.uniqueGroupsHandler = RelMetadataQuery.initialHandler(classOf[UniqueGroups.Handler])
-    this.distributionHandler = RelMetadataQuery.initialHandler(classOf[FlinkDistribution.Handler])
-    this.modifiedMonotonicityHandler =
-      RelMetadataQuery.initialHandler(classOf[ModifiedMonotonicity.Handler])
-  }
-
-  /**
-    * Returns the [[ColumnInterval]] statistic.
-    *
-    * @param rel   the relational expression
-    * @param index the index of the given column
-    * @return the interval of the given column of a specified relational expression.
-    *         Returns null if interval cannot be estimated,
-    *         Returns [[org.apache.flink.table.plan.stats.EmptyValueInterval]]
-    *         if column values does not contains any value except for null.
-    */
-  def getColumnInterval(rel: RelNode, index: Int): ValueInterval = {
-    try {
-      columnIntervalHandler.getColumnInterval(rel, this, index)
-    } catch {
-      case e: JaninoRelMetadataProvider.NoHandler =>
-        columnIntervalHandler = revise(e.relClass, FlinkMetadata.ColumnInterval.DEF)
-        getColumnInterval(rel, index)
-    }
-  }
-
-  /**
-    * Returns the [[ColumnInterval]] of the given column under the given filter argument.
-    *
-    * @param rel   the relational expression
-    * @param columnIndex the index of the given column
-    * @param filterArg the index of the filter argument
-    * @return the interval of the given column of a specified relational expression.
-    *         Returns null if interval cannot be estimated,
-    *         Returns [[org.apache.flink.table.plan.stats.EmptyValueInterval]]
-    *         if column values does not contains any value except for null.
-    */
-  def getFilteredColumnInterval(rel: RelNode, columnIndex: Int, filterArg: Int): ValueInterval = {
-    try {
-      filteredColumnInterval.getFilteredColumnInterval(
-        rel, this, columnIndex, filterArg)
-    } catch {
-      case e: JaninoRelMetadataProvider.NoHandler =>
-        filteredColumnInterval = revise(e.relClass, FlinkMetadata.FilteredColumnInterval.DEF)
-        getFilteredColumnInterval(rel, columnIndex, filterArg)
-    }
-  }
-
-  /**
-    * Returns the null count of the given column.
-    *
-    * @param rel   the relational expression
-    * @param index the index of the given column
-    * @return the null count of the given column if can be estimated, else return null.
-    */
-  def getColumnNullCount(rel: RelNode, index: Int): JDouble = {
-    try {
-      columnNullCountHandler.getColumnNullCount(rel, this, index)
-    } catch {
-      case e: JaninoRelMetadataProvider.NoHandler =>
-        columnNullCountHandler = revise(e.relClass, FlinkMetadata.ColumnNullCount.DEF)
-        getColumnNullCount(rel, index)
-    }
-  }
-
-  /**
-    * Returns origin null count of the given column.
-    *
-    * @param rel   the relational expression
-    * @param index the index of the given column
-    * @return the null count of the given column if can be estimated, else return null.
-    */
-  def getColumnOriginNullCount(rel: RelNode, index: Int): JDouble = {
-    try {
-      columnOriginNullCountHandler.getColumnOriginNullCount(rel, this, index)
-    } catch {
-      case e: JaninoRelMetadataProvider.NoHandler =>
-        columnOriginNullCountHandler = revise(e.relClass, FlinkMetadata.ColumnOriginNullCount.DEF)
-        getColumnOriginNullCount(rel, index)
-    }
-  }
-
-  /**
-    * Returns the (minimum) unique groups of the given columns.
-    *
-    * @param rel the relational expression
-    * @param columns the given columns in a specified relational expression.
-    *                The given columns should not be null.
-    * @return the (minimum) unique columns which should be a sub-collection of the given columns,
-    *         and should not be null or empty. If none unique columns can be found, return the
-    *         given columns.
-    */
-  def getUniqueGroups(rel: RelNode, columns: ImmutableBitSet): ImmutableBitSet = {
-    try {
-      require(columns != null)
-      if (columns.isEmpty) {
-        return columns
-      }
-      val uniqueGroups = uniqueGroupsHandler.getUniqueGroups(rel, this, columns)
-      require(uniqueGroups != null && !uniqueGroups.isEmpty)
-      require(columns.contains(uniqueGroups))
-      uniqueGroups
-    } catch {
-      case e: JaninoRelMetadataProvider.NoHandler =>
-        uniqueGroupsHandler = revise(e.relClass, FlinkMetadata.UniqueGroups.DEF)
-        getUniqueGroups(rel, columns)
-    }
-  }
-
-  /**
-    * Returns the [[FlinkRelDistribution]] statistic.
-    *
-    * @param rel the relational expression
-    * @return description of how the rows in the relational expression are
-    *         physically distributed
-    */
-  def flinkDistribution(rel: RelNode): FlinkRelDistribution = {
-    try {
-      distributionHandler.flinkDistribution(rel, this)
-    } catch {
-      case e: JaninoRelMetadataProvider.NoHandler =>
-        distributionHandler = revise(e.relClass, FlinkMetadata.FlinkDistribution.DEF)
-        flinkDistribution(rel)
-    }
-  }
-
-  /**
-    * Returns the [[RelModifiedMonotonicity]] statistic.
-    *
-    * @param rel the relational expression
-    * @return the monotonicity for the corresponding RelNode
-    */
-  def getRelModifiedMonotonicity(rel: RelNode): RelModifiedMonotonicity = {
-    try {
-      modifiedMonotonicityHandler.getRelModifiedMonotonicity(rel, this)
-    } catch {
-      case e: JaninoRelMetadataProvider.NoHandler =>
-        modifiedMonotonicityHandler = revise(e.relClass, FlinkMetadata.ModifiedMonotonicity.DEF)
-        getRelModifiedMonotonicity(rel)
-    }
-  }
-
-}
-
-object FlinkRelMetadataQuery {
-
-  def instance(): FlinkRelMetadataQuery = new FlinkRelMetadataQuery()
-
-  /**
-    * Reuse input metadataQuery instance if it could cast to FlinkRelMetadataQuery class,
-    * or create one if not.
-    *
-    * @param mq metadataQuery which try to reuse
-    * @return a FlinkRelMetadataQuery instance
-    */
-  def reuseOrCreate(mq: RelMetadataQuery): FlinkRelMetadataQuery = {
-    mq match {
-      case q: FlinkRelMetadataQuery => q
-      case _ => FlinkRelMetadataQuery.instance()
-    }
-  }
-}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/StreamTestSink.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/StreamTestSink.scala
index e91a152..c2ce840 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/StreamTestSink.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/StreamTestSink.scala
@@ -138,7 +138,7 @@ final class TestingAppendBaseRowSink(
     this(rowTypeInfo, TimeZone.getTimeZone("UTC"))
   }
 
-  def invoke(value: BaseRow): Unit = localResults +=
+  override def invoke(value: BaseRow): Unit = localResults +=
     BaseRowTestUtil.baseRowToString(value, rowTypeInfo, tz)
 
   def getAppendResults: List[String] = getResults
@@ -150,7 +150,7 @@ final class TestingAppendSink(tz: TimeZone) extends AbstractExactlyOnceSink[Row]
     this(TimeZone.getTimeZone("UTC"))
   }
 
-  def invoke(value: Row): Unit = localResults += TestSinkUtil.rowToString(value, tz)
+  override def invoke(value: Row): Unit = localResults += TestSinkUtil.rowToString(value, tz)
 
   def getAppendResults: List[String] = getResults
 }
@@ -210,7 +210,7 @@ final class TestingUpsertSink(keys: Array[Int], tz: TimeZone)
     }
   }
 
-  def invoke(d: (Boolean, BaseRow)): Unit = {
+  override def invoke(d: (Boolean, BaseRow)): Unit = {
     this.synchronized {
       val wrapRow = new GenericRow(2)
       wrapRow.setField(0, d._1)
@@ -438,7 +438,7 @@ class TestingRetractSink(tz: TimeZone)
     }
   }
 
-  def invoke(v: (Boolean, Row)): Unit = {
+  override def invoke(v: (Boolean, Row)): Unit = {
     this.synchronized {
       val tupleString = "(" + v._1.toString + "," + TestSinkUtil.rowToString(v._2, tz) + ")"
       localResults += tupleString