You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Marc Hübner (JIRA)" <ji...@apache.org> on 2017/01/20 10:41:26 UTC

[jira] [Comment Edited] (FLINK-4719) KryoSerializer random exception

    [ https://issues.apache.org/jira/browse/FLINK-4719?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15831458#comment-15831458 ] 

Marc Hübner edited comment on FLINK-4719 at 1/20/17 10:40 AM:
--------------------------------------------------------------

I get a similar exception, when spilling to disk to process big amounts of avro files. It might not be related, because at first, flink failed to use the avro serializer. But after enforcing the avro serializer it still failed to deserialize after spilling to disk. So I am assuming that the problem is actually located in "SpillingAdaptiveSpanningRecordDeserializer". I am using scala and flink 1.1.4.

My Code (removed the import for the Document avro class, the io utils are reading from hdfs):
{code:java}
import java.io.IOException

import org.apache.flink.api.common.JobExecutionResult
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.api.scala._
import org.apache.flink.configuration.Configuration
import org.apache.solr.common.params.MapSolrParams
import org.apache.solr.update.processor.TextProfileSignature
import org.slf4j.LoggerFactory

object DeduplicationFilterJob {
  private val LOG = LoggerFactory.getLogger(getClass)

  def main(args: Array[String]) {
    if (args.length != 1) {
      printUsage()
      System.exit(-1)
    }
    try {
      LOG.info("Loading configuration from file " + args(0))
      val conf = ParameterTool.fromPropertiesFile(args(0)).getConfiguration
      val exitVal: Int = run(conf)
      System.exit(exitVal)
    } catch {
      case e: IOException =>
        LOG.error("Could not load job configuration", e)
        printUsage()
    }
  }

  private def printUsage() = {
    println(s"Usage: java -cp [classpath] ${getClass.getName} config")
    println("config - a valid FS path to a config/properties file")
  }

  private def run(conf: Configuration): Int = {
    val env = ExecutionEnvironment.getExecutionEnvironment
    env.getConfig.setGlobalJobParameters(conf)
    env.getConfig.enableForceAvro()

    if (conf.getInteger("env.parallelism", -1) > 0)
      env.setParallelism(conf.getInteger("env.parallelism", -1))

    val documents: DataSet[Document] =
      DatasetIOUtils.readDocuments(env, conf.getString("input.path", null), conf.getString("input.format", "avro"))
      .name("Read documents")

    val signatureDocumentPairs: DataSet[(String, Document)] = documents.map((document: Document) => {
      val contentSignature = new TextProfileSignature
      contentSignature.init(new MapSolrParams(new java.util.HashMap[String, String]()))
      contentSignature.add(document.getText)
      val signature = new String(contentSignature.getSignature, "UTF-8")
      (signature, document)
    }).name("Compute document signatures")

    val deduplicatedDocuments = signatureDocumentPairs
      .groupBy(0)
      .reduceGroup(_.next()._2)
      .name("Select first document per signature group")

    DatasetIOUtils.writeDocuments(
      conf.getString("output.path", null),
      conf.getString("output.format", "avro"),
      deduplicatedDocuments)

    val executionResult: JobExecutionResult = env.execute("deduplication filter")
    if (executionResult != null) 1 else -1
  }

}
{code}

The stacktrace:
{code}
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:805)
	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:751)
	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:751)
	at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
	at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.Exception: The data preparation for task 'GroupReduce (Select first document per signature group)' , caused an error: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: 795228258
	at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:462)
	at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:642)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: 795228258
	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
	at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1098)
	at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99)
	at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:456)
	... 3 more
Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated due to an exception: 795228258
	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 795228258
	at org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:402)
	at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:290)
	at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
	at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267)
	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:155)
	at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193)
	at org.apache.avro.reflect.ReflectDatumReader.readField(ReflectDatumReader.java:230)
	at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183)
	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
	at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193)
	at org.apache.avro.reflect.ReflectDatumReader.readField(ReflectDatumReader.java:230)
	at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183)
	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
	at org.apache.avro.reflect.ReflectDatumReader.readCollection(ReflectDatumReader.java:162)
	at org.apache.avro.reflect.ReflectDatumReader.readArray(ReflectDatumReader.java:123)
	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:155)
	at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193)
	at org.apache.avro.reflect.ReflectDatumReader.readField(ReflectDatumReader.java:230)
	at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183)
	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142)
	at org.apache.flink.api.java.typeutils.runtime.AvroSerializer.deserialize(AvroSerializer.java:123)
	at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:113)
	at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:106)
	at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:30)
	at org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)
	at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:155)
	at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:73)
	at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:42)
	at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035)
	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
{code}

The stack trace of an exception before enforcing the avro serializer:
{code}
java.lang.Exception: The data preparation for task 'GroupReduce (Select first document per signature group)' , caused an error: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: Unable to find class: s_business_space
Serialization trace:
attributes (de.dfki.lt.tap.ConceptMention)
	at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:462)
	at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:642)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: Unable to find class: s_business_space
Serialization trace:
attributes (de.dfki.lt.tap.ConceptMention)
	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
	at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1098)
	at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99)
	at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:456)
	... 3 more
Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated due to an exception: Unable to find class: s_business_space
Serialization trace:
attributes (de.dfki.lt.tap.ConceptMention)
	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
Caused by: com.esotericsoftware.kryo.KryoException: Unable to find class: s_business_space
Serialization trace:
attributes (de.dfki.lt.tap.ConceptMention)
	at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
	at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
	at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
	at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
	at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
	at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
	at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
	at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:232)
	at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:246)
	at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:256)
	at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:556)
	at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:89)
	at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)
	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
Caused by: java.lang.ClassNotFoundException: s_business_space
	at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
	at java.lang.Class.forName0(Native Method)
	at java.lang.Class.forName(Class.java:348)
	at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136)
	... 20 more
2017-01-19 11:00:36,423 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Could not restart the job spree deduplication filter (b07462ca08254ddcdcad6eef2629c541).
java.lang.Exception: The data preparation for task 'GroupReduce (Select first document per signature group)' , caused an error: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: Unable to find class: s_business_space
Serialization trace:
attributes (de.dfki.lt.tap.ConceptMention)
	at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:462)
	at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:642)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: Unable to find class: s_business_space
Serialization trace:
attributes (de.dfki.lt.tap.ConceptMention)
	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
	at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1098)
	at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99)
	at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:456)
	... 3 more
Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated due to an exception: Unable to find class: s_business_space
Serialization trace:
attributes (de.dfki.lt.tap.ConceptMention)
	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
Caused by: com.esotericsoftware.kryo.KryoException: Unable to find class: s_business_space
Serialization trace:
attributes (de.dfki.lt.tap.ConceptMention)
	at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
	at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
	at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
	at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
	at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
	at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
	at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
	at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:232)
	at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:246)
	at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:256)
	at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:556)
	at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:89)
	at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)
	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
Caused by: java.lang.ClassNotFoundException: s_business_space
	at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
	at java.lang.Class.forName0(Native Method)
	at java.lang.Class.forName(Class.java:348)
	at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136)
	... 20 more
2017-01-19 11:00:37,496 INFO  org.apache.flink.yarn.YarnJobManager                          - Stopping JobManager with final application status SUCCEEDED and diagnostics: Flink YARN Client requested shutdown
2017-01-19 11:00:37,497 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Shutting down cluster with status SUCCEEDED : Flink YARN Client requested shutdown
2017-01-19 11:00:37,499 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Unregistering application from the YARN Resource Manager
2017-01-19 11:00:37,509 INFO  org.apache.flink.yarn.YarnJobManager                          - Stopping JobManager akka.tcp://flink@10.13.2.6:40283/user/jobmanager.
2017-01-19 11:00:37,509 INFO  org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl         - Waiting for application to be successfully unregistered.
2017-01-19 11:00:37,612 INFO  org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl  - Interrupted while waiting for queue
java.lang.InterruptedException
	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)
	at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
	at org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl$CallbackHandlerThread.run(AMRMClientAsyncImpl.java:274)
2017-01-19 11:00:37,613 INFO  org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy  - Opening proxy : abemasanao.unbelievable-machine.net:8041
2017-01-19 11:00:37,637 INFO  org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy  - Opening proxy : acamas.unbelievable-machine.net:8041
2017-01-19 11:00:37,648 INFO  org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy  - Opening proxy : baggaley.unbelievable-machine.net:8041
2017-01-19 11:00:37,661 INFO  org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy  - Opening proxy : dido.unbelievable-machine.net:8041
2017-01-19 11:00:37,721 INFO  org.apache.flink.runtime.blob.BlobServer                      - Stopped BLOB server at 0.0.0.0:40514
2017-01-19 11:00:37,728 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator         - Shutting down remote daemon.
2017-01-19 11:00:37,729 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator         - Remote daemon shut down; proceeding with flushing remote transports.
2017-01-19 11:00:37,758 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator         - Remoting shut down.
2017-01-19 11:00:37,775 INFO  org.apache.flink.yarn.YarnJobManager                          - Shutdown completed. Stopping JVM.
2017-01-19 11:00:37,777 INFO  org.apache.flink.runtime.webmonitor.WebRuntimeMonitor         - Removing web dashboard root cache directory /tmp/flink-web-4e068ddd-fe08-4115-8e98-794e7e5b83c6
2017-01-19 11:00:37,777 INFO  org.apache.flink.runtime.webmonitor.WebRuntimeMonitor         - Removing web dashboard jar upload directory /tmp/flink-web-upload-344a9e8c-a495-43cb-a8bb-e28388cb70f2
2017-01-19 11:00:37,778 INFO  org.apache.flink.runtime.webmonitor.StackTraceSampleCoordinator  - Shutting down stack trace sample coordinator.
{code}


was (Author: mhbnr):
I get a similar exception, when spilling to disk to process big amounts of avro files. It might not be related, because at first, flink failed to use the avro serializer. But after enforcing the avro serializer it still failed to deserialize after spilling to disk. So I am assuming that the problem is actually located in "SpillingAdaptiveSpanningRecordDeserializer". I am using scala and flink 1.1.4.

My Code (removed the import for the Document avro class, the io utils are reading from hdfs):
{code:java}
import java.io.IOException

import org.apache.flink.api.common.JobExecutionResult
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.api.scala._
import org.apache.flink.configuration.Configuration
import org.apache.solr.common.params.MapSolrParams
import org.apache.solr.update.processor.TextProfileSignature
import org.slf4j.LoggerFactory

object DeduplicationFilterJob {
  private val LOG = LoggerFactory.getLogger(getClass)

  def main(args: Array[String]) {
    if (args.length != 1) {
      printUsage()
      System.exit(-1)
    }
    try {
      LOG.info("Loading configuration from file " + args(0))
      val conf = ParameterTool.fromPropertiesFile(args(0)).getConfiguration
      val exitVal: Int = run(conf)
      System.exit(exitVal)
    } catch {
      case e: IOException =>
        LOG.error("Could not load job configuration", e)
        printUsage()
    }
  }

  private def printUsage() = {
    println(s"Usage: java -cp [classpath] ${getClass.getName} config")
    println("config - a valid FS path to a config/properties file")
  }

  private def run(conf: Configuration): Int = {
    val env = ExecutionEnvironment.getExecutionEnvironment
    env.getConfig.setGlobalJobParameters(conf)
    env.getConfig.enableForceAvro()

    if (conf.getInteger("env.parallelism", -1) > 0)
      env.setParallelism(conf.getInteger("env.parallelism", -1))

    val documents: DataSet[Document] =
      DatasetIOUtils.readDocuments(env, conf.getString("input.path", null), conf.getString("input.format", "avro"))
      .name("Read documents")

    val signatureDocumentPairs: DataSet[(String, Document)] = documents.map((document: Document) => {
      val contentSignature = new TextProfileSignature
      contentSignature.init(new MapSolrParams(new java.util.HashMap[String, String]()))
      contentSignature.add(document.getText)
      val signature = new String(contentSignature.getSignature, "UTF-8")
      (signature, document)
    }).name("Compute document signatures")

    val deduplicatedDocuments = signatureDocumentPairs
      .groupBy(0)
      .reduceGroup(_.next()._2)
      .name("Select first document per signature group")

    DatasetIOUtils.writeDocuments(
      conf.getString("output.path", null),
      conf.getString("output.format", "avro"),
      deduplicatedDocuments)

    val executionResult: JobExecutionResult = env.execute("deduplication filter")
    if (executionResult != null) 1 else -1
  }

}
{code}

The stacktrace:
{code}
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:805)
	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:751)
	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:751)
	at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
	at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.Exception: The data preparation for task 'GroupReduce (Select first document per signature group)' , caused an error: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: 795228258
	at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:462)
	at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:642)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: 795228258
	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
	at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1098)
	at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99)
	at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:456)
	... 3 more
Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated due to an exception: 795228258
	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 795228258
	at org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:402)
	at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:290)
	at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
	at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267)
	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:155)
	at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193)
	at org.apache.avro.reflect.ReflectDatumReader.readField(ReflectDatumReader.java:230)
	at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183)
	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
	at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193)
	at org.apache.avro.reflect.ReflectDatumReader.readField(ReflectDatumReader.java:230)
	at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183)
	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
	at org.apache.avro.reflect.ReflectDatumReader.readCollection(ReflectDatumReader.java:162)
	at org.apache.avro.reflect.ReflectDatumReader.readArray(ReflectDatumReader.java:123)
	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:155)
	at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193)
	at org.apache.avro.reflect.ReflectDatumReader.readField(ReflectDatumReader.java:230)
	at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183)
	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142)
	at org.apache.flink.api.java.typeutils.runtime.AvroSerializer.deserialize(AvroSerializer.java:123)
	at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:113)
	at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:106)
	at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:30)
	at org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)
	at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:155)
	at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:73)
	at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:42)
	at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035)
	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
{code}

> KryoSerializer random exception
> -------------------------------
>
>                 Key: FLINK-4719
>                 URL: https://issues.apache.org/jira/browse/FLINK-4719
>             Project: Flink
>          Issue Type: Bug
>          Components: Core
>    Affects Versions: 1.1.1
>            Reporter: Flavio Pompermaier
>            Assignee: Nico Kruber
>              Labels: kryo, serialization
>
> There's a random exception that involves somehow the KryoSerializer when using POJOs in Flink jobs reading large volumes of data.
> It is usually thrown in several places, e.g. (the Exceptions reported here can refer to previous versions of Flink...):
> {code}
> java.lang.Exception: The data preparation for task 'CHAIN GroupReduce (GroupReduce at createResult(IndexMappingExecutor.java:42)) -> Map (Map at main(Jsonizer.java:128))' , caused an error: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: Unable to find class: java.ttil.HashSet
>         at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:456)
>         at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>         at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: Unable to find class: java.ttil.HashSet
>         at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
>         at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079)
>         at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)
>         at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450)
>         ... 3 more
> Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated due to an exception: Unable to find class: java.ttil.HashSet
>         at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
> Caused by: com.esotericsoftware.kryo.KryoException: Unable to find class: java.ttil.HashSet
>         at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
>         at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
>         at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>         at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
>         at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
>         at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
>         at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>         at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
>         at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:242)
>         at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:252)
>         at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:556)
>         at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:75)
>         at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)
>         at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
>         at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
> Caused by: java.lang.ClassNotFoundException: java.ttil.HashSet
>         at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>         at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>         at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>         at java.lang.Class.forName0(Native Method)
>         at java.lang.Class.forName(Class.java:348)
>         at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136)
> {code}
> {code}
> Caused by: java.io.IOException: Serializer consumed more bytes than the record had. This indicates broken serialization. If you are using custom serialization types (Value or Writable), check their serialization methods. If you are using a Kryo-serialized type, check the corresponding Kryo serializer.
>     at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:142)
>     at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
>     at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
>     at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
>     at org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:101)
>     at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
>     at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>     at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.ArrayIndexOutOfBoundsException: -2
>     at java.util.ArrayList.elementData(ArrayList.java:418)
>     at java.util.ArrayList.get(ArrayList.java:431)
>     at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
>     at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
>     at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
>     at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
>     at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:431)
>     at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:135)
>     at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>     at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>     at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
> {code}
> {code}
> java.lang.RuntimeException: Cannot instantiate class.
> 	at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:407)
> 	at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
> 	at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
> 	at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
> 	at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
> 	at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
> 	at org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:101)
> 	at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
> 	at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> 	at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.ClassNotFoundException: it.okkam.flink.test.model.pojo.VdhicleEvent
> 	at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> 	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> 	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> 	at java.lang.Class.forName0(Native Method)
> 	at java.lang.Class.forName(Class.java:348)
> 	at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:405)
> 	... 10 more
> {code}
> {code}
> com.esotericsoftware.kryo.KryoException: Unable to find class: ^Z^A
>         at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
>         at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
>         at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>         at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
>         at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
>         at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:431)
>         at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>         at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
>         at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
>         at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
>         at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
>         at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:96)
>         at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
>         at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>         at java.lang.Thread.run(Thread.java:745)
> {code}
> {code}
> Caused by: java.lang.ArrayIndexOutOfBoundsException: -2
> 	at java.util.ArrayList.elementData(ArrayList.java:418)
> 	at java.util.ArrayList.get(ArrayList.java:431)
> 	at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
> 	at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
> 	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
> 	at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:135)
> 	at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
> 	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
> 	at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:232)
> 	at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:431)
> 	at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:135)
> 	at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
> 	at org.apache.flink.runtime.io.disk.ChannelReaderInputViewIterator.next(ChannelReaderInputViewIterator.java:100)
> 	at org.apache.flink.runtime.operators.sort.MergeIterator$HeadStream.nextHead(MergeIterator.java:161)
> 	at org.apache.flink.runtime.operators.sort.MergeIterator.next(MergeIterator.java:113)
> 	at org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator.next(CountingMutableObjectIterator.java:45)
> 	at org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.advanceToNext(NonReusingKeyGroupedIterator.java:130)
> 	at org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.access$300(NonReusingKeyGroupedIterator.java:32)
> 	at org.apache.flink.runtime.util.NonReusingKeyGroupedIterator$ValuesIterator.next(NonReusingKeyGroupedIterator.java:192)
> 	at org.okkam.entitons.mapping.flink.IndexMappingExecutor$TupleToEntitonJsonNode.reduce(IndexMappingExecutor.java:64)
> 	at org.apache.flink.runtime.operators.GroupReduceDriver.run(GroupReduceDriver.java:131)
> 	at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486)
> 	at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351)
> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> 	at java.lang.Thread.run(Thread.java:745)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)