You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by tillrohrmann <gi...@git.apache.org> on 2017/06/19 10:53:24 UTC

[GitHub] flink pull request #4142: [FLINK-6948] Harden EnumValueSerializer to detect ...

GitHub user tillrohrmann opened a pull request:

    https://github.com/apache/flink/pull/4142

    [FLINK-6948] Harden EnumValueSerializer to detect changed enum indices

    This PR changes the seriailization format of the ScalaEnumSerializerConfigSnapshot to also include the
    ordinal value of an enum value when being deserialized. This allows to detect if the ordinal values
    have been changed and, thus, if migration is required.
    
    IMPORTANT: This PR changes the serialization format of ScalaEnumSerializerConfigSnapshot.
    
    cc @tzulitai 

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tillrohrmann/flink hardenEnumValueSerializerTest

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/4142.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4142
    
----
commit 80b2d61b58be06f9e3f40b35f0b215421ad86890
Author: Till Rohrmann <tr...@apache.org>
Date:   2017-06-13T21:26:10Z

    [FLINK-6836] [tests] Fix YARNSessionCapacitySchedulerITCase to work with Hadoop 2.6.5, 2.7.3 and 2.8.0
    
    Due to MNG-5899, maven cannot resolve dependency reduced poms in a multi project build. Therefore,
    flink-yarn-tests pulls in a wrong version of org.apache.httpcomponents.httpclient which does not work
    with Hadoop's ServletUtils together. As a solution we have to move the dependency management for the
    httpclient and httpcore version into the parent pom.xml.
    
    Another problem is the version of these libraries which has been recently bumped. In 4.4, httpclient
    changed its behaviour such that URLEncodedUtils#parse(String, Charset) now throws a NPE if the first
    parameter is null. In 4.2.6, an empty list was returned instead. Due to this incompatibility, we reverted
    the change and set the version to its previous value.
    
    Bump httpclient to 4.5.3 and httpcore to 4.4.6
    
    This closes #4120.

commit 7c7fe493f2dd766a8ddf3de240f9447f857b423a
Author: Till Rohrmann <tr...@apache.org>
Date:   2017-06-14T14:53:49Z

    [FLINK-6921] [serializer] Allow EnumValueSerializer to deal with appended enum values
    
    The problem was that we don't check the bounds of the array with the enum names contained
    in the ScalaEnumSerializerConfigSnapshot.
    
    This PR also adds an Enumeration upgrade test which makes sure that appended fields are
    supported without migration. Moreover, it checks that a field removal and an order change
    leads to a required migration.
    
    This closes #4126.

commit 9576530a0c5c3fe866bf87f126b29eb5acb56ba2
Author: Till Rohrmann <tr...@apache.org>
Date:   2017-06-19T10:49:05Z

    [FLINK-6948] Harden EnumValueSerializer to detect changed enum indices
    
    This PR changes the seriailization format of the ScalaEnumSerializerConfigSnapshot to also include the
    ordinal value of an enum value when being deserialized. This allows to detect if the ordinal values
    have been changed and, thus, if migration is required.
    
    IMPORTANT: This PR changes the serialization format of ScalaEnumSerializerConfigSnapshot.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4142: [FLINK-6948] Harden EnumValueSerializer to detect ...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4142#discussion_r122687300
  
    --- Diff: flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializerUpgradeTest.scala ---
    @@ -0,0 +1,219 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.scala.typeutils
    +
    +import java.io._
    +import java.net.{URL, URLClassLoader}
    +
    +import org.apache.flink.api.common.typeutils.{CompatibilityResult, TypeSerializerSerializationUtil}
    +import org.apache.flink.core.memory.{DataInputViewStreamWrapper, DataOutputViewStreamWrapper}
    +import org.apache.flink.util.TestLogger
    +import org.junit.rules.TemporaryFolder
    +import org.junit.{Rule, Test}
    +import org.junit.Assert._
    +import org.scalatest.junit.JUnitSuiteLike
    +
    +import scala.reflect.NameTransformer
    +import scala.tools.nsc.reporters.ConsoleReporter
    +import scala.tools.nsc.{GenericRunnerSettings, Global}
    +
    +class EnumValueSerializerUpgradeTest extends TestLogger with JUnitSuiteLike {
    +
    +  private val _tempFolder = new TemporaryFolder()
    +
    +  @Rule
    +  def tempFolder = _tempFolder
    +
    +  val enumName = "EnumValueSerializerUpgradeTestEnum"
    +
    +  val enumA =
    +    s"""
    +      |@SerialVersionUID(1L)
    +      |object $enumName extends Enumeration {
    +      |  val A, B, C = Value
    +      |}
    +    """.stripMargin
    +
    +  val enumB =
    +    s"""
    +       |@SerialVersionUID(1L)
    +       |object $enumName extends Enumeration {
    +       |  val A, B, C, D = Value
    +       |}
    +    """.stripMargin
    +
    +  val enumC =
    +    s"""
    +       |@SerialVersionUID(1L)
    +       |object $enumName extends Enumeration {
    +       |  val A, C = Value
    +       |}
    +    """.stripMargin
    +
    +  val enumD =
    +    s"""
    +       |@SerialVersionUID(1L)
    +       |object $enumName extends Enumeration {
    +       |  val A, C, B = Value
    +       |}
    +    """.stripMargin
    +
    +  val enumE =
    +    s"""
    +       |@SerialVersionUID(1L)
    +       |object $enumName extends Enumeration {
    +       |  val A = Value(42)
    +       |  val B = Value(5)
    +       |  val C = Value(1337)
    +       |}
    +    """.stripMargin
    +
    +  /**
    +    * Check that identical enums don't require migration
    +    */
    +  @Test
    +  def checkIdenticalEnums(): Unit = {
    +    assertFalse(checkCompatibility(enumA, enumA).isRequiresMigration)
    +  }
    +
    +  /**
    +    * Check that appending fields to the enum does not require migration
    +    */
    +  @Test
    +  def checkAppendedField(): Unit = {
    +    assertFalse(checkCompatibility(enumA, enumB).isRequiresMigration)
    +  }
    +
    +  /**
    +    * Check that removing enum fields requires migration
    +    */
    +  @Test
    +  def checkRemovedField(): Unit = {
    +    assertTrue(checkCompatibility(enumA, enumC).isRequiresMigration)
    +  }
    +
    +  /**
    +    * Check that changing the enum field order requires migration
    +    */
    +  @Test
    +  def checkDifferentFieldOrder(): Unit = {
    +    assertTrue(checkCompatibility(enumA, enumD).isRequiresMigration)
    +  }
    +
    +  /**
    +    * Check that changing the enum ids causes a migration
    +    */
    +  @Test
    +  def checkDifferentIds(): Unit = {
    +    assertTrue(
    +      "Different ids should cause a migration.",
    +      checkCompatibility(enumA, enumE).isRequiresMigration)
    +  }
    +
    +  def checkCompatibility(enumSourceA: String, enumSourceB: String)
    +    : CompatibilityResult[Enumeration#Value] = {
    +    import EnumValueSerializerUpgradeTest._
    +
    +    val classLoader = compileAndLoadEnum(tempFolder.newFolder(), s"$enumName.scala", enumSourceA)
    +
    +    val enum = instantiateEnum[Enumeration](classLoader, enumName)
    +
    +    val enumValueSerializer = new EnumValueSerializer(enum)
    +    val snapshot = enumValueSerializer.snapshotConfiguration()
    +
    +    val baos = new ByteArrayOutputStream()
    +    val output = new DataOutputViewStreamWrapper(baos)
    +    TypeSerializerSerializationUtil.writeSerializerConfigSnapshot(output, snapshot)
    +
    +    output.close()
    +    baos.close()
    +
    +    val bais = new ByteArrayInputStream(baos.toByteArray)
    +    val input=  new DataInputViewStreamWrapper(bais)
    +
    +    val classLoader2 = compileAndLoadEnum(tempFolder.newFolder(), s"$enumName.scala", enumSourceB)
    +
    +    val snapshot2 = TypeSerializerSerializationUtil.readSerializerConfigSnapshot(
    +      input,
    +      classLoader2)
    +    val enum2 = instantiateEnum[Enumeration](classLoader2, enumName)
    +
    +    val enumValueSerializer2 = new EnumValueSerializer(enum2)
    +    enumValueSerializer2.ensureCompatibility(snapshot2)
    +  }
    +}
    +
    +object EnumValueSerializerUpgradeTest {
    +  def compileAndLoadEnum(root: File, filename: String, source: String): ClassLoader = {
    +    val file = writeSourceFile(root, filename, source)
    +
    +    compileScalaFile(file)
    +
    +    new URLClassLoader(
    +      Array[URL](root.toURI.toURL),
    +      Thread.currentThread().getContextClassLoader)
    +  }
    +
    +  def instantiateEnum[T <: Enumeration](classLoader: ClassLoader, enumName: String): T = {
    +    val clazz = classLoader.loadClass(enumName + "$").asInstanceOf[Class[_ <: Enumeration]]
    +    val field = clazz.getField(NameTransformer.MODULE_INSTANCE_NAME)
    +
    +    field.get(null).asInstanceOf[T]
    +  }
    +
    +  def writeSourceFile(root: File, filename: String, source: String): File = {
    +    val file = new File(root, filename)
    +    val fileWriter = new FileWriter(file)
    +
    +    fileWriter.write(source)
    +
    +    fileWriter.close()
    +
    +    file
    +  }
    +
    +  def compileScalaFile(file: File): Unit = {
    +    val in = new BufferedReader(new StringReader(""))
    +    val out = new PrintWriter(new BufferedWriter(
    +      new OutputStreamWriter(System.out)))
    +
    +    val settings = new GenericRunnerSettings(out.println _)
    +
    +    val classLoader = Thread.currentThread().getContextClassLoader
    +
    +    val urls = classLoader match {
    +      case urlClassLoader: URLClassLoader =>
    +        urlClassLoader.getURLs
    +      case x => throw new IllegalStateException(s"Not possible to extract URLs " +
    +        s"from class loader $x.")
    +    }
    +
    +    settings.classpath.value = urls.map(_.toString).mkString(java.io.File.pathSeparator)
    +    settings.outdir.value = file.getParent
    +
    +    val reporter = new ConsoleReporter(settings)
    +    val global = new Global(settings, reporter)
    +    val run = new global.Run
    +
    +    run.compile(List(file.getAbsolutePath))
    +
    +    reporter.printSummary()
    --- End diff --
    
    Would this add too much unnecessary output to the test log?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4142: [FLINK-6948] Harden EnumValueSerializer to detect changed...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/4142
  
    The tests are failing due to checkstyle.
    I'll fix that and address my comments, and run tests locally. If green, will proceed to merge this for `master` and `release-1.3`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4142: [FLINK-6948] Harden EnumValueSerializer to detect ...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4142#discussion_r122686133
  
    --- Diff: flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializerTest.scala ---
    @@ -0,0 +1,119 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.scala.typeutils
    +
    +import java.io._
    +
    +import org.apache.flink.api.scala.typeutils.EnumValueSerializer.ScalaEnumSerializerConfigSnapshot
    +import org.apache.flink.core.testutils.CommonTestUtils
    +import org.apache.flink.runtime.util.{DataInputDeserializer, DataOutputSerializer}
    +import org.apache.flink.util.TestLogger
    +import org.junit.{Ignore, Test}
    +import org.junit.Assert._
    +import org.scalatest.junit.JUnitSuiteLike
    +
    +class EnumValueSerializerTest extends TestLogger with JUnitSuiteLike {
    +
    +  /**
    +    * Tests that the snapshot configuration can be created and that the serializer
    +    * is compatible when being called with the created serializer snapshot
    +    *
    +    * FLINK-6914
    +    */
    +  @Test
    +  def testEnumValueSerializerEnsureCompatibilityIdempotency() {
    +    val enumSerializer = new EnumValueSerializer(Letters)
    +
    +    val snapshot = enumSerializer.snapshotConfiguration()
    +
    +    assertFalse(enumSerializer.ensureCompatibility(snapshot).isRequiresMigration)
    +  }
    +
    +  @Ignore("This test case is only used to create a " +
    +    "TypeSerializerConfigurationSnapshot for the EnumValueSerializer")
    +  @Test
    +  def createEnumValueSerializerConfigSnapshot(): Unit = {
    +    val enumValueSerializer = new EnumValueSerializer(Letters)
    +
    +    val snapshot = enumValueSerializer.snapshotConfiguration()
    +
    +    val out = new DataOutputSerializer(128)
    +
    +    snapshot.write(out)
    +    val buffer = out.getCopyOfBuffer
    +
    +    val tmpDir = CommonTestUtils.getTempDir
    +    val configFile = new File(tmpDir, "EnumValueSerializerConfigSnapshot-1.3.1")
    +
    +    log.info(s"Writing EnumValueSerializerConfigSnapshot-1.3.1 to $configFile")
    +
    +    val outputStream = new DataOutputStream(new FileOutputStream(configFile))
    +
    +    try {
    +      outputStream.writeInt(buffer.length)
    +      outputStream.write(buffer)
    +    } finally {
    +      outputStream.close()
    +    }
    +  }
    +
    +  /**
    +    * Check backwards compatibility between 1.3.1 and 1.3.2 since the
    +    * ScalaEnumSerializerConfigSnapshot format changed.
    +    *
    +    * FLINK-694X
    --- End diff --
    
    nit: I would use `@see` for this.
    Also, directly refer to FLINK-6948, because that's the exact issue that required the format change.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4142: [FLINK-6948] Harden EnumValueSerializer to detect ...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/4142


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4142: [FLINK-6948] Harden EnumValueSerializer to detect ...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4142#discussion_r122682551
  
    --- Diff: flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializer.scala ---
    @@ -150,8 +162,29 @@ object EnumValueSerializer {
                 enumClass = InstantiationUtil.deserializeObject(
                   inViewWrapper, getUserCodeClassLoader)
    --- End diff --
    
    Since we're changing the serialization format, we might as well also exclude Java serialization here for good now. Otherwise we'll need to bump the version again afterwards.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4142: [FLINK-6948] Harden EnumValueSerializer to detect ...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4142#discussion_r122684692
  
    --- Diff: flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializerTest.scala ---
    @@ -0,0 +1,119 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.scala.typeutils
    +
    +import java.io._
    +
    +import org.apache.flink.api.scala.typeutils.EnumValueSerializer.ScalaEnumSerializerConfigSnapshot
    +import org.apache.flink.core.testutils.CommonTestUtils
    +import org.apache.flink.runtime.util.{DataInputDeserializer, DataOutputSerializer}
    +import org.apache.flink.util.TestLogger
    +import org.junit.{Ignore, Test}
    +import org.junit.Assert._
    +import org.scalatest.junit.JUnitSuiteLike
    +
    +class EnumValueSerializerTest extends TestLogger with JUnitSuiteLike {
    +
    +  /**
    +    * Tests that the snapshot configuration can be created and that the serializer
    +    * is compatible when being called with the created serializer snapshot
    +    *
    +    * FLINK-6914
    --- End diff --
    
    nit: I would use `@see` for this


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---