You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Shannon Carey <sc...@expedia.com> on 2016/10/11 21:40:39 UTC

mapreduce.HadoopOutputFormat config value issue

In Flink 1.1.1, I am seeing what looks like a serialization issue of org.apache.hadoop.conf.Configuration or when used with org.apache.flink.api.scala.hadoop.mapreduce.HadoopOutputFormat. When I use the mapred.HadoopOutputFormat version, it works just fine.

Specifically, the job fails because "java.lang.UnsupportedOperationException: You must set the ColumnFamily schema using setColumnFamilySchema." I am definitely setting that property, and it appears to be getting serialized, but when the config deserializes the setting is gone. Anybody have any ideas? In the meantime, I will continue using the "mapred" package.

Stack trace:
java.lang.UnsupportedOperationException: You must set the ColumnFamily schema using setColumnFamilySchema.
at org.apache.cassandra.hadoop.cql3.CqlBulkOutputFormat.getColumnFamilySchema(CqlBulkOutputFormat.java:184)
at org.apache.cassandra.hadoop.cql3.CqlBulkRecordWriter.setConfigs(CqlBulkRecordWriter.java:94)
at org.apache.cassandra.hadoop.cql3.CqlBulkRecordWriter.<init>(CqlBulkRecordWriter.java:74)
at org.apache.cassandra.hadoop.cql3.CqlBulkOutputFormat.getRecordWriter(CqlBulkOutputFormat.java:86)
at org.apache.cassandra.hadoop.cql3.CqlBulkOutputFormat.getRecordWriter(CqlBulkOutputFormat.java:52)
at org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormatBase.open(HadoopOutputFormatBase.java:146)
at org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:176)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)


Code that works:

    val insertStmt = s"INSERT INTO $keyspace.$colFamily (user_id, updated_time, value) VALUES (?, ?, ?)"
    val config = new JobConf()

    ConfigHelper.setOutputInitialAddress(config, initialOutputAddress.getHostAddress)

    CqlBulkOutputFormat.setColumnFamilySchema(config, colFamily, cqlTableSchema)
    CqlBulkOutputFormat.setColumnFamilyInsertStatement(config, colFamily, insertStmt)
    CqlBulkOutputFormat.setDeleteSourceOnSuccess(config, true)
    ConfigHelper.setOutputColumnFamily(config,
      keyspace,
      colFamily)
    ConfigHelper.setOutputPartitioner(config, partitionerClass)

    val outputFormat = new mapred.HadoopOutputFormat[Object, java.util.List[ByteBuffer]](
      new CqlBulkOutputFormat,
      config)

Code that doesn't work:

    val insertStmt = s"INSERT INTO $keyspace.$colFamily (user_id, updated_time, value) VALUES (?, ?, ?)"
    val config = new Configuration()

    ConfigHelper.setOutputInitialAddress(config, initialOutputAddress.getHostAddress)

    CqlBulkOutputFormat.setColumnFamilySchema(config, colFamily, cqlTableSchema)
    CqlBulkOutputFormat.setColumnFamilyInsertStatement(config, colFamily, insertStmt)
    CqlBulkOutputFormat.setDeleteSourceOnSuccess(config, true)
    ConfigHelper.setOutputColumnFamily(config,
      keyspace,
      colFamily)
    ConfigHelper.setOutputPartitioner(config, partitionerClass)

    val hadoopJob: Job = Job.getInstance(config)

    val outputFormat = new mapreduce.HadoopOutputFormat[Object, java.util.List[ByteBuffer]](
      new CqlBulkOutputFormat,
      hadoopJob)


Re: mapreduce.HadoopOutputFormat config value issue

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Shannon,

I tried to reproduce the problem in a unit test without success.
My test configures a HadoopOutputFormat object, serializes and deserializes
it, cally open, and verifies that a configured String property is present
in the getRecordWriter() method.

Next I would try to reproduce the error with Cassandra. Which version are
you using?
Can you also open a JIRA issue for this bug?

Thanks, Fabian

2016-10-11 23:40 GMT+02:00 Shannon Carey <sc...@expedia.com>:

> In Flink 1.1.1, I am seeing what looks like a serialization issue of
> org.apache.hadoop.conf.Configuration or when used
> with org.apache.flink.api.scala.hadoop.mapreduce.HadoopOutputFormat. When
> I use the mapred.HadoopOutputFormat version, it works just fine.
>
> Specifically, the job fails because "java.lang.UnsupportedOperationException:
> You must set the ColumnFamily schema using setColumnFamilySchema." I am
> definitely setting that property, and it appears to be getting serialized,
> but when the config deserializes the setting is gone. Anybody have any
> ideas? In the meantime, I will continue using the "mapred" package.
>
> Stack trace:
> java.lang.UnsupportedOperationException: You must set the ColumnFamily
> schema using setColumnFamilySchema.
> at org.apache.cassandra.hadoop.cql3.CqlBulkOutputFormat.
> getColumnFamilySchema(CqlBulkOutputFormat.java:184)
> at org.apache.cassandra.hadoop.cql3.CqlBulkRecordWriter.setConfigs(
> CqlBulkRecordWriter.java:94)
> at org.apache.cassandra.hadoop.cql3.CqlBulkRecordWriter.<
> init>(CqlBulkRecordWriter.java:74)
> at org.apache.cassandra.hadoop.cql3.CqlBulkOutputFormat.getRecordWriter(
> CqlBulkOutputFormat.java:86)
> at org.apache.cassandra.hadoop.cql3.CqlBulkOutputFormat.getRecordWriter(
> CqlBulkOutputFormat.java:52)
> at org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormatBase.open(
> HadoopOutputFormatBase.java:146)
> at org.apache.flink.runtime.operators.DataSinkTask.invoke(
> DataSinkTask.java:176)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
>
>
> Code that works:
>
>     val insertStmt = s"INSERT INTO $keyspace.$colFamily (user_id,
> updated_time, value) VALUES (?, ?, ?)"
>     val config = new JobConf()
>
>     ConfigHelper.setOutputInitialAddress(config, initialOutputAddress.
> getHostAddress)
>
>     CqlBulkOutputFormat.setColumnFamilySchema(config, colFamily,
> cqlTableSchema)
>     CqlBulkOutputFormat.setColumnFamilyInsertStatement(config, colFamily,
> insertStmt)
>     CqlBulkOutputFormat.setDeleteSourceOnSuccess(config, true)
>     ConfigHelper.setOutputColumnFamily(config,
>       keyspace,
>       colFamily)
>     ConfigHelper.setOutputPartitioner(config, partitionerClass)
>
>     val outputFormat = new mapred.HadoopOutputFormat[Object,
> java.util.List[ByteBuffer]](
>       new CqlBulkOutputFormat,
>       config)
>
> Code that doesn't work:
>
>     val insertStmt = s"INSERT INTO $keyspace.$colFamily (user_id,
> updated_time, value) VALUES (?, ?, ?)"
>     val config = new Configuration()
>
>     ConfigHelper.setOutputInitialAddress(config, initialOutputAddress.
> getHostAddress)
>
>     CqlBulkOutputFormat.setColumnFamilySchema(config, colFamily,
> cqlTableSchema)
>     CqlBulkOutputFormat.setColumnFamilyInsertStatement(config, colFamily,
> insertStmt)
>     CqlBulkOutputFormat.setDeleteSourceOnSuccess(config, true)
>     ConfigHelper.setOutputColumnFamily(config,
>       keyspace,
>       colFamily)
>     ConfigHelper.setOutputPartitioner(config, partitionerClass)
>
>     val hadoopJob: Job = Job.getInstance(config)
>
>     val outputFormat = new mapreduce.HadoopOutputFormat[Object,
> java.util.List[ByteBuffer]](
>       new CqlBulkOutputFormat,
>       hadoopJob)
>
>