You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/12/18 04:08:40 UTC

[GitHub] asfgit closed pull request #7267: [FLINK-11083][Table&SQL] CRowSerializerConfigSnapshot is not instantiable

asfgit closed pull request #7267: [FLINK-11083][Table&SQL] CRowSerializerConfigSnapshot is not instantiable
URL: https://github.com/apache/flink/pull/7267
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala
index 0ce3aee3739..b3fe5085151 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala
@@ -81,7 +81,7 @@ class CRowSerializer(val rowSerializer: TypeSerializer[Row]) extends TypeSeriali
   // --------------------------------------------------------------------------------------------
 
   override def snapshotConfiguration(): TypeSerializerConfigSnapshot[CRow] = {
-    new CRowSerializer.CRowSerializerConfigSnapshot(rowSerializer)
+    new CRowSerializer.CRowSerializerConfigSnapshot(Array(rowSerializer))
   }
 
   override def ensureCompatibility(
@@ -115,9 +115,13 @@ class CRowSerializer(val rowSerializer: TypeSerializer[Row]) extends TypeSeriali
 
 object CRowSerializer {
 
-  class CRowSerializerConfigSnapshot(rowSerializers: TypeSerializer[Row]*)
+  class CRowSerializerConfigSnapshot(rowSerializers: Array[TypeSerializer[Row]])
     extends CompositeTypeSerializerConfigSnapshot[CRow](rowSerializers: _*) {
 
+    def this() {
+      this(Array.empty)
+    }
+
     override def getVersion: Int = CRowSerializerConfigSnapshot.VERSION
   }
 
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/types/CRowSerializerTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/types/CRowSerializerTest.scala
index 7483b04d9ca..055501a6a01 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/types/CRowSerializerTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/types/CRowSerializerTest.scala
@@ -18,8 +18,20 @@
 
 package org.apache.flink.table.runtime.types
 
-import org.apache.flink.util.TestLogger
-import org.junit.Test
+import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
+import org.apache.flink.api.common.typeinfo.Types
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction
+import org.apache.flink.streaming.api.operators.{AbstractStreamOperator, KeyedProcessOperator}
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, InstantiationUtil, TestLogger}
+
+import org.junit.{Assert, Test}
 
 class CRowSerializerTest extends TestLogger {
 
@@ -29,6 +41,70 @@ class CRowSerializerTest extends TestLogger {
   @Test
   def testDefaultConstructor(): Unit = {
     new CRowSerializer.CRowSerializerConfigSnapshot()
+
+    InstantiationUtil.instantiate(classOf[CRowSerializer.CRowSerializerConfigSnapshot])
+  }
+
+  @Test
+  def testStateRestore(): Unit = {
+
+    class IKeyedProcessFunction extends KeyedProcessFunction[Integer, Integer, Integer] {
+      var state: ListState[CRow] = _
+      override def open(parameters: Configuration): Unit = {
+        val stateDesc = new ListStateDescriptor[CRow]("CRow",
+          new CRowTypeInfo(new RowTypeInfo(Types.INT)))
+        state = getRuntimeContext.getListState(stateDesc)
+      }
+      override def processElement(value: Integer,
+          ctx: KeyedProcessFunction[Integer, Integer, Integer]#Context,
+          out: Collector[Integer]): Unit = {
+        state.add(new CRow(Row.of(value), true))
+      }
+    }
+
+    val operator = new KeyedProcessOperator[Integer, Integer, Integer](new IKeyedProcessFunction)
+
+    var testHarness = new KeyedOneInputStreamOperatorTestHarness[Integer, Integer, Integer](
+      operator,
+      new KeySelector[Integer, Integer] {
+        override def getKey(value: Integer): Integer= -1
+      },
+      Types.INT, 1, 1, 0)
+    testHarness.setup()
+    testHarness.open()
+
+    testHarness.processElement(new StreamRecord[Integer](1, 1L))
+    testHarness.processElement(new StreamRecord[Integer](2, 1L))
+    testHarness.processElement(new StreamRecord[Integer](3, 1L))
+
+    Assert.assertEquals(1, numKeyedStateEntries(operator))
+
+    val snapshot = testHarness.snapshot(0L, 0L)
+    testHarness.close()
+
+    testHarness = new KeyedOneInputStreamOperatorTestHarness[Integer, Integer, Integer](
+      operator,
+      new KeySelector[Integer, Integer] {
+        override def getKey(value: Integer): Integer= -1
+      },
+      Types.INT, 1, 1, 0)
+    testHarness.setup()
+
+    testHarness.initializeState(snapshot)
+
+    testHarness.open()
+
+    Assert.assertEquals(1, numKeyedStateEntries(operator))
+
+    testHarness.close()
+  }
+
+  def numKeyedStateEntries(operator: AbstractStreamOperator[_]): Int = {
+    val keyedStateBackend = operator.getKeyedStateBackend
+    keyedStateBackend match {
+      case hksb: HeapKeyedStateBackend[_] => hksb.numKeyValueStateEntries
+      case _ => throw new UnsupportedOperationException
+    }
   }
 
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services