You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hugegraph.apache.org by ya...@apache.org on 2023/05/23 05:15:42 UTC
[incubator-hugegraph-toolchain] branch master updated: fix:Spark loader meet Exception: Class is not registered (#470)
This is an automated email from the ASF dual-hosted git repository.
yangjiaqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hugegraph-toolchain.git
The following commit(s) were added to refs/heads/master by this push:
new bd082036 fix:Spark loader meet Exception: Class is not registered (#470)
bd082036 is described below
commit bd0820365506a1f5f9f657440d2735ff342f69a3
Author: alan.zhao <30...@users.noreply.github.com>
AuthorDate: Tue May 23 13:15:37 2023 +0800
fix:Spark loader meet Exception: Class is not registered (#470)
Co-authored-by: alanzhao <al...@126.com>
Co-authored-by: Simon Cheung <mi...@apache.org>
---
.../loader/spark/HugeGraphSparkLoader.java | 45 +++++++++++++---------
1 file changed, 27 insertions(+), 18 deletions(-)
diff --git a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/spark/HugeGraphSparkLoader.java b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/spark/HugeGraphSparkLoader.java
index bccd0e5f..228f2db0 100644
--- a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/spark/HugeGraphSparkLoader.java
+++ b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/spark/HugeGraphSparkLoader.java
@@ -17,11 +17,14 @@
package org.apache.hugegraph.loader.spark;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hugegraph.driver.GraphManager;
import org.apache.hugegraph.loader.builder.EdgeBuilder;
import org.apache.hugegraph.loader.builder.ElementBuilder;
import org.apache.hugegraph.loader.builder.VertexBuilder;
import org.apache.hugegraph.loader.direct.loader.HBaseDirectLoader;
+import org.apache.hugegraph.loader.exception.LoadException;
import org.apache.hugegraph.loader.executor.LoadContext;
import org.apache.hugegraph.loader.executor.LoadOptions;
import org.apache.hugegraph.loader.metrics.LoadDistributeMetrics;
@@ -95,36 +98,42 @@ public class HugeGraphSparkLoader implements Serializable {
this.executor = Executors.newCachedThreadPool();
}
- public void load() throws ExecutionException, InterruptedException {
- LoadMapping mapping = LoadMapping.of(this.loadOptions.file);
- List<InputStruct> structs = mapping.structs();
- boolean sinkType = this.loadOptions.sinkType;
- if (!sinkType) {
- this.loadOptions.copyBackendStoreInfo(mapping.getBackendStoreInfo());
- }
- // kryo序列化
- SparkConf conf = new SparkConf().set("spark.serializer",
- "org.apache.spark.serializer.KryoSerializer")
- .set("spark.kryo.registrationRequired", "true");
+ private void registerKryoClasses (SparkConf conf) {
try {
- conf.registerKryoClasses(new Class[]{
- org.apache.hadoop.hbase.io.ImmutableBytesWritable.class,
- org.apache.hadoop.hbase.KeyValue.class,
+ conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+ .set("spark.kryo.registrationRequired", "true")
+ .registerKryoClasses(new Class[] {
+ ImmutableBytesWritable.class,
+ KeyValue.class,
org.apache.spark.sql.types.StructType.class,
StructField[].class,
StructField.class,
org.apache.spark.sql.types.LongType$.class,
org.apache.spark.sql.types.Metadata.class,
org.apache.spark.sql.types.StringType$.class,
- Class.forName(
- "org.apache.spark.internal.io.FileCommitProtocol$TaskCommitMessage"),
- Class.forName("scala.reflect.ClassTag$$anon$1"),
+ org.apache.spark.sql.catalyst.InternalRow.class,
+ org.apache.spark.sql.catalyst.InternalRow[].class,
+ Class.forName("org.apache.spark.internal.io." +
+ "FileCommitProtocol$TaskCommitMessage"),
Class.forName("scala.collection.immutable.Set$EmptySet$"),
Class.forName("org.apache.spark.sql.types.DoubleType$")
- });
+ });
} catch (ClassNotFoundException e) {
LOG.error("spark kryo serialized registration failed");
+ throw new LoadException("spark kryo serialized registration failed", e);
}
+ }
+
+ public void load() throws ExecutionException, InterruptedException {
+ LoadMapping mapping = LoadMapping.of(this.loadOptions.file);
+ List<InputStruct> structs = mapping.structs();
+ boolean sinkType = this.loadOptions.sinkType;
+ if (!sinkType) {
+ this.loadOptions.copyBackendStoreInfo(mapping.getBackendStoreInfo());
+ }
+
+ SparkConf conf = new SparkConf();
+ registerKryoClasses(conf);
SparkSession session = SparkSession.builder().config(conf).getOrCreate();
SparkContext sc = session.sparkContext();