You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Andrea Spina <an...@radicalbit.io> on 2019/06/23 18:20:51 UTC
Linkage Error RocksDB and flink-1.6.4
Dear community,
I am running a Flink Job backed by RocksDB, version 1.6.4 and scala 2.11.
At the job Startp the following exception happens (it's recorded by the Job
Manager).
*Caused by: java.lang.LinkageError: loader constraint violation: loader
(instance of
org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders$ChildFirstClassLoader)
previously initiated loading for a different type with name
"org/rocksdb/DBOptions" at java.lang.ClassLoader.defineClass1(Native
Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
at
java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
at java.net.URLClassLoader.access$100(URLClassLoader.java:74) at
java.net.URLClassLoader$1.run(URLClassLoader.java:369) at
java.net.URLClassLoader$1.run(URLClassLoader.java:363) at
java.security.AccessController.doPrivileged(Native Method) at
java.net.URLClassLoader.findClass(URLClassLoader.java:362) at
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:126)*
For this job, I programmatically set some RocksDB options by using the code
appended below. Anybody can help with this? Thank you so much,
Andrea
import org.apache.flink.configuration.MemorySize
import org.apache.flink.contrib.streaming.state.{OptionsFactory,
PredefinedOptions, RocksDBStateBackend}
import org.rocksdb.{BlockBasedTableConfig, ColumnFamilyOptions, DBOptions}
object ConfigurableRocksDB {
lazy val columnOptions = new ColumnFamilyOptions() with Serializable
lazy val tableConfig = new BlockBasedTableConfig() with Serializable
lazy val dbOptions = new DBOptions() with Serializable
def configureStateBackendRocksDB(properties: FlinkDeployment):
RocksDBStateBackend = {
properties.threadNo.foreach(dbOptions.setIncreaseParallelism)
properties.blockSize.foreach(bs =>
tableConfig.setBlockSize(MemorySize.parseBytes(bs)))
properties.cacheSize.foreach(cs =>
tableConfig.setBlockCacheSize(MemorySize.parseBytes(cs)))
properties.cacheIndicesAndFilters.foreach(cif => if (cif)
tableConfig.cacheIndexAndFilterBlocks())
properties.writeBufferSize.foreach(wbs =>
columnOptions.setWriteBufferSize(MemorySize.parseBytes(wbs)))
columnOptions.setTableFormatConfig(tableConfig)
properties.writeBufferToMerge.foreach(bm =>
columnOptions.setMinWriteBufferNumberToMerge(bm))
properties.writeBufferCount.foreach(bc =>
columnOptions.setMaxWriteBufferNumber(bc))
properties.optimizeFilterForHits.foreach(op => if (op)
columnOptions.optimizeFiltersForHits())
val rocksdbConfig = new OptionsFactory() {
override def createDBOptions(currentOptions: DBOptions):
DBOptions = dbOptions
override def createColumnOptions(currentOptions:
ColumnFamilyOptions): ColumnFamilyOptions = columnOptions
}
val stateBE =
new RocksDBStateBackend(properties.checkpointDir.get,
properties.checkpointIncremental.getOrElse(false))
stateBE.setPredefinedOptions(PredefinedOptions.FLASH_SSD_OPTIMIZED)
stateBE.setOptions(rocksdbConfig)
stateBE
}
}
--
*Andrea Spina*
Head of R&D @ Radicalbit Srl
Via Giovanni Battista Pirelli 11, 20124, Milano - IT
Re: Linkage Error RocksDB and flink-1.6.4
Posted by Andrea Spina <an...@radicalbit.io>.
Hi Shu Su,
the first point exactly pinpointed the issue I bumped into. I forgot to put
that dependency to "provided". Thank you!
Il giorno lun 24 giu 2019 alle ore 05:35 Shu Su <ba...@163.com> ha
scritto:
> Hi Andrea
>
> Actually It’s caused by Flink’s ClassLoader. It’s because flink use
> parent Classloader to load jar first and then you use it in your user’s
> code, then user-code classloader will load it again so it raised the error.
> There are two solutions.
> 1. Add scope “provided” to maven pom.xml
> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-statebackend-rocksdb_2.11</artifactId>
> <version>${flink_version}</version>
> *<scope>provided</scope>*
> </dependency>
> 2. Set this classloader.resolve-order: parent-first in flink-conf.yml
>
> Hope this will help you.
>
> Thanks,
> Simon
> On 06/24/2019 11:27,Yun Tang<my...@live.com> <my...@live.com> wrote:
>
> Hi Andrea
>
> Since I have not written Scala for a while, I wonder why you need to
> instantiate your ColumnFamilyOptions, BlockBasedTableConfig and DBOptions
> on JM side. As far as I can see, you could instantiate your on your TM side
> like code:
>
> val rocksdbConfig = new OptionsFactory() {
> override def createDBOptions(currentOptions: DBOptions): DBOptions =
> currentOptions.setIncreaseParallelism(properties.threadNo)
>
> override def createColumnOptions(currentOptions: ColumnFamilyOptions): ColumnFamilyOptions =
> currentOptions.setWriteBufferSize(MemorySize.parseBytes(properties.writeBufferSize))
> }
>
> You just need to serialize the properties via closure to TMs. Hope this could help you.
>
> Best
> Yun Tang
> ------------------------------
> *From:* Andrea Spina <an...@radicalbit.io>
> *Sent:* Monday, June 24, 2019 2:20
> *To:* user
> *Subject:* Linkage Error RocksDB and flink-1.6.4
>
> Dear community,
> I am running a Flink Job backed by RocksDB, version 1.6.4 and scala 2.11.
> At the job Startp the following exception happens (it's recorded by the Job
> Manager).
>
>
>
>
>
>
>
>
>
>
>
> *Caused by: java.lang.LinkageError: loader constraint violation: loader
> (instance of
> org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders$ChildFirstClassLoader)
> previously initiated loading for a different type with name
> "org/rocksdb/DBOptions" at
> java.lang.ClassLoader.defineClass1(Native Method) at
> java.lang.ClassLoader.defineClass(ClassLoader.java:763) at
> java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
> at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
> at java.net.URLClassLoader.access$100(URLClassLoader.java:74) at
> java.net.URLClassLoader$1.run(URLClassLoader.java:369) at
> java.net.URLClassLoader$1.run(URLClassLoader.java:363) at
> java.security.AccessController.doPrivileged(Native Method) at
> java.net.URLClassLoader.findClass(URLClassLoader.java:362) at
> org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:126)*
>
> For this job, I programmatically set some RocksDB options by using the
> code appended below. Anybody can help with this? Thank you so much,
> Andrea
>
>
> import org.apache.flink.configuration.MemorySize
> import org.apache.flink.contrib.streaming.state.{OptionsFactory, PredefinedOptions, RocksDBStateBackend}
> import org.rocksdb.{BlockBasedTableConfig, ColumnFamilyOptions, DBOptions}
>
> object ConfigurableRocksDB {
>
> lazy val columnOptions = new ColumnFamilyOptions() with Serializable
> lazy val tableConfig = new BlockBasedTableConfig() with Serializable
> lazy val dbOptions = new DBOptions() with Serializable
>
> def configureStateBackendRocksDB(properties: FlinkDeployment): RocksDBStateBackend = {
> properties.threadNo.foreach(dbOptions.setIncreaseParallelism)
>
> properties.blockSize.foreach(bs => tableConfig.setBlockSize(MemorySize.parseBytes(bs)))
> properties.cacheSize.foreach(cs => tableConfig.setBlockCacheSize(MemorySize.parseBytes(cs)))
> properties.cacheIndicesAndFilters.foreach(cif => if (cif) tableConfig.cacheIndexAndFilterBlocks())
> properties.writeBufferSize.foreach(wbs => columnOptions.setWriteBufferSize(MemorySize.parseBytes(wbs)))
>
> columnOptions.setTableFormatConfig(tableConfig)
> properties.writeBufferToMerge.foreach(bm => columnOptions.setMinWriteBufferNumberToMerge(bm))
> properties.writeBufferCount.foreach(bc => columnOptions.setMaxWriteBufferNumber(bc))
> properties.optimizeFilterForHits.foreach(op => if (op) columnOptions.optimizeFiltersForHits())
>
> val rocksdbConfig = new OptionsFactory() {
> override def createDBOptions(currentOptions: DBOptions): DBOptions = dbOptions
> override def createColumnOptions(currentOptions: ColumnFamilyOptions): ColumnFamilyOptions = columnOptions
> }
>
> val stateBE =
> new RocksDBStateBackend(properties.checkpointDir.get, properties.checkpointIncremental.getOrElse(false))
> stateBE.setPredefinedOptions(PredefinedOptions.FLASH_SSD_OPTIMIZED)
> stateBE.setOptions(rocksdbConfig)
>
> stateBE
> }
>
> }
>
> --
> *Andrea Spina*
> Head of R&D @ Radicalbit Srl
> Via Giovanni Battista Pirelli 11, 20124, Milano - IT
>
>
--
*Andrea Spina*
Head of R&D @ Radicalbit Srl
Via Giovanni Battista Pirelli 11, 20124, Milano - IT
Re: Linkage Error RocksDB and flink-1.6.4
Posted by Shu Su <ba...@163.com>.
Hi Andrea
Actually It’s caused by Flink’s ClassLoader. It’s because flink use parent Classloader to load jar first and then you use it in your user’s code, then user-code classloader will load it again so it raised the error. There are two solutions.
1. Add scope “provided” to maven pom.xml
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.11</artifactId>
<version>${flink_version}</version>
<scope>provided</scope>
</dependency>
2. Set this classloader.resolve-order: parent-first in flink-conf.yml
Hope this will help you.
Thanks,
Simon
On 06/24/2019 11:27,Yun Tang<my...@live.com> wrote:
Hi Andrea
Since I have not written Scala for a while, I wonder why you need to instantiate your ColumnFamilyOptions, BlockBasedTableConfig and DBOptions on JM side. As far as I can see, you could instantiate your on your TM side like code:
val rocksdbConfig = new OptionsFactory() {
override def createDBOptions(currentOptions: DBOptions): DBOptions =
currentOptions.setIncreaseParallelism(properties.threadNo)
override def createColumnOptions(currentOptions: ColumnFamilyOptions): ColumnFamilyOptions =
currentOptions.setWriteBufferSize(MemorySize.parseBytes(properties.writeBufferSize))
}
You just need to serialize theproperties via closure to TMs. Hope this could help you.
Best
Yun Tang
From: Andrea Spina <an...@radicalbit.io>
Sent: Monday, June 24, 2019 2:20
To: user
Subject: Linkage Error RocksDB and flink-1.6.4
Dear community,
I am running a Flink Job backed by RocksDB, version 1.6.4 and scala 2.11. At the job Startp the following exception happens (it's recorded by the Job Manager).
Caused by: java.lang.LinkageError: loader constraint violation: loader (instance of org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders$ChildFirstClassLoader) previously initiated loading for a different type with name "org/rocksdb/DBOptions"
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:126)
For this job, I programmatically set some RocksDB options by using the code appended below. Anybody can help with this? Thank you so much,
Andrea
import org.apache.flink.configuration.MemorySize
import org.apache.flink.contrib.streaming.state.{OptionsFactory, PredefinedOptions, RocksDBStateBackend}
import org.rocksdb.{BlockBasedTableConfig, ColumnFamilyOptions, DBOptions}
object ConfigurableRocksDB {
lazy val columnOptions = new ColumnFamilyOptions() with Serializable
lazy val tableConfig = new BlockBasedTableConfig() with Serializable
lazy val dbOptions = new DBOptions() with Serializable
def configureStateBackendRocksDB(properties: FlinkDeployment): RocksDBStateBackend = {
properties.threadNo.foreach(dbOptions.setIncreaseParallelism)
properties.blockSize.foreach(bs => tableConfig.setBlockSize(MemorySize.parseBytes(bs)))
properties.cacheSize.foreach(cs => tableConfig.setBlockCacheSize(MemorySize.parseBytes(cs)))
properties.cacheIndicesAndFilters.foreach(cif => if (cif) tableConfig.cacheIndexAndFilterBlocks())
properties.writeBufferSize.foreach(wbs => columnOptions.setWriteBufferSize(MemorySize.parseBytes(wbs)))
columnOptions.setTableFormatConfig(tableConfig)
properties.writeBufferToMerge.foreach(bm => columnOptions.setMinWriteBufferNumberToMerge(bm))
properties.writeBufferCount.foreach(bc => columnOptions.setMaxWriteBufferNumber(bc))
properties.optimizeFilterForHits.foreach(op => if (op) columnOptions.optimizeFiltersForHits())
val rocksdbConfig = new OptionsFactory() {
override def createDBOptions(currentOptions: DBOptions): DBOptions = dbOptions
override def createColumnOptions(currentOptions: ColumnFamilyOptions): ColumnFamilyOptions = columnOptions
}
val stateBE =
new RocksDBStateBackend(properties.checkpointDir.get, properties.checkpointIncremental.getOrElse(false))
stateBE.setPredefinedOptions(PredefinedOptions.FLASH_SSD_OPTIMIZED)
stateBE.setOptions(rocksdbConfig)
stateBE
}
}
--
Andrea Spina
Head of R&D @ Radicalbit Srl
Via Giovanni Battista Pirelli 11, 20124, Milano - IT
Re: Linkage Error RocksDB and flink-1.6.4
Posted by Yun Tang <my...@live.com>.
Hi Andrea
Since I have not written Scala for a while, I wonder why you need to instantiate your ColumnFamilyOptions, BlockBasedTableConfig and DBOptions on JM side. As far as I can see, you could instantiate your on your TM side like code:
val rocksdbConfig = new OptionsFactory() {
override def createDBOptions(currentOptions: DBOptions): DBOptions =
currentOptions.setIncreaseParallelism(properties.threadNo)
override def createColumnOptions(currentOptions: ColumnFamilyOptions): ColumnFamilyOptions =
currentOptions.setWriteBufferSize(MemorySize.parseBytes(properties.writeBufferSize))
}
You just need to serialize the properties via closure to TMs. Hope this could help you.
Best
Yun Tang
________________________________
From: Andrea Spina <an...@radicalbit.io>
Sent: Monday, June 24, 2019 2:20
To: user
Subject: Linkage Error RocksDB and flink-1.6.4
Dear community,
I am running a Flink Job backed by RocksDB, version 1.6.4 and scala 2.11. At the job Startp the following exception happens (it's recorded by the Job Manager).
Caused by: java.lang.LinkageError: loader constraint violation: loader (instance of org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders$ChildFirstClassLoader) previously initiated loading for a different type with name "org/rocksdb/DBOptions"
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:126)
For this job, I programmatically set some RocksDB options by using the code appended below. Anybody can help with this? Thank you so much,
Andrea
import org.apache.flink.configuration.MemorySize
import org.apache.flink.contrib.streaming.state.{OptionsFactory, PredefinedOptions, RocksDBStateBackend}
import org.rocksdb.{BlockBasedTableConfig, ColumnFamilyOptions, DBOptions}
object ConfigurableRocksDB {
lazy val columnOptions = new ColumnFamilyOptions() with Serializable
lazy val tableConfig = new BlockBasedTableConfig() with Serializable
lazy val dbOptions = new DBOptions() with Serializable
def configureStateBackendRocksDB(properties: FlinkDeployment): RocksDBStateBackend = {
properties.threadNo.foreach(dbOptions.setIncreaseParallelism)
properties.blockSize.foreach(bs => tableConfig.setBlockSize(MemorySize.parseBytes(bs)))
properties.cacheSize.foreach(cs => tableConfig.setBlockCacheSize(MemorySize.parseBytes(cs)))
properties.cacheIndicesAndFilters.foreach(cif => if (cif) tableConfig.cacheIndexAndFilterBlocks())
properties.writeBufferSize.foreach(wbs => columnOptions.setWriteBufferSize(MemorySize.parseBytes(wbs)))
columnOptions.setTableFormatConfig(tableConfig)
properties.writeBufferToMerge.foreach(bm => columnOptions.setMinWriteBufferNumberToMerge(bm))
properties.writeBufferCount.foreach(bc => columnOptions.setMaxWriteBufferNumber(bc))
properties.optimizeFilterForHits.foreach(op => if (op) columnOptions.optimizeFiltersForHits())
val rocksdbConfig = new OptionsFactory() {
override def createDBOptions(currentOptions: DBOptions): DBOptions = dbOptions
override def createColumnOptions(currentOptions: ColumnFamilyOptions): ColumnFamilyOptions = columnOptions
}
val stateBE =
new RocksDBStateBackend(properties.checkpointDir.get, properties.checkpointIncremental.getOrElse(false))
stateBE.setPredefinedOptions(PredefinedOptions.FLASH_SSD_OPTIMIZED)
stateBE.setOptions(rocksdbConfig)
stateBE
}
}
--
Andrea Spina
Head of R&D @ Radicalbit Srl
Via Giovanni Battista Pirelli 11, 20124, Milano - IT