You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hugegraph.apache.org by ya...@apache.org on 2023/05/23 05:15:42 UTC

[incubator-hugegraph-toolchain] branch master updated: fix:Spark loader meet Exception: Class is not registered (#470)

This is an automated email from the ASF dual-hosted git repository.

yangjiaqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hugegraph-toolchain.git


The following commit(s) were added to refs/heads/master by this push:
     new bd082036 fix:Spark loader meet Exception: Class is not registered (#470)
bd082036 is described below

commit bd0820365506a1f5f9f657440d2735ff342f69a3
Author: alan.zhao <30...@users.noreply.github.com>
AuthorDate: Tue May 23 13:15:37 2023 +0800

    fix:Spark loader meet Exception: Class is not registered (#470)
    
    Co-authored-by: alanzhao <al...@126.com>
    Co-authored-by: Simon Cheung <mi...@apache.org>
---
 .../loader/spark/HugeGraphSparkLoader.java         | 45 +++++++++++++---------
 1 file changed, 27 insertions(+), 18 deletions(-)

diff --git a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/spark/HugeGraphSparkLoader.java b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/spark/HugeGraphSparkLoader.java
index bccd0e5f..228f2db0 100644
--- a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/spark/HugeGraphSparkLoader.java
+++ b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/spark/HugeGraphSparkLoader.java
@@ -17,11 +17,14 @@
 
 package org.apache.hugegraph.loader.spark;
 
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hugegraph.driver.GraphManager;
 import org.apache.hugegraph.loader.builder.EdgeBuilder;
 import org.apache.hugegraph.loader.builder.ElementBuilder;
 import org.apache.hugegraph.loader.builder.VertexBuilder;
 import org.apache.hugegraph.loader.direct.loader.HBaseDirectLoader;
+import org.apache.hugegraph.loader.exception.LoadException;
 import org.apache.hugegraph.loader.executor.LoadContext;
 import org.apache.hugegraph.loader.executor.LoadOptions;
 import org.apache.hugegraph.loader.metrics.LoadDistributeMetrics;
@@ -95,36 +98,42 @@ public class HugeGraphSparkLoader implements Serializable {
         this.executor = Executors.newCachedThreadPool();
     }
 
-    public void load() throws ExecutionException, InterruptedException {
-        LoadMapping mapping = LoadMapping.of(this.loadOptions.file);
-        List<InputStruct> structs = mapping.structs();
-        boolean sinkType = this.loadOptions.sinkType;
-        if (!sinkType) {
-            this.loadOptions.copyBackendStoreInfo(mapping.getBackendStoreInfo());
-        }
-        // kryo序列化
-        SparkConf conf = new SparkConf().set("spark.serializer",
-                                             "org.apache.spark.serializer.KryoSerializer")
-                                        .set("spark.kryo.registrationRequired", "true");
+    private void  registerKryoClasses (SparkConf conf) {
         try {
-            conf.registerKryoClasses(new Class[]{
-                    org.apache.hadoop.hbase.io.ImmutableBytesWritable.class,
-                    org.apache.hadoop.hbase.KeyValue.class,
+            conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+                .set("spark.kryo.registrationRequired", "true")
+                .registerKryoClasses(new Class[] {
+                    ImmutableBytesWritable.class,
+                    KeyValue.class,
                     org.apache.spark.sql.types.StructType.class,
                     StructField[].class,
                     StructField.class,
                     org.apache.spark.sql.types.LongType$.class,
                     org.apache.spark.sql.types.Metadata.class,
                     org.apache.spark.sql.types.StringType$.class,
-                    Class.forName(
-                            "org.apache.spark.internal.io.FileCommitProtocol$TaskCommitMessage"),
-                    Class.forName("scala.reflect.ClassTag$$anon$1"),
+                    org.apache.spark.sql.catalyst.InternalRow.class,
+                    org.apache.spark.sql.catalyst.InternalRow[].class,
+                    Class.forName("org.apache.spark.internal.io." +
+                            "FileCommitProtocol$TaskCommitMessage"),
                     Class.forName("scala.collection.immutable.Set$EmptySet$"),
                     Class.forName("org.apache.spark.sql.types.DoubleType$")
-            });
+                    });
         } catch (ClassNotFoundException e) {
             LOG.error("spark kryo serialized registration failed");
+            throw new LoadException("spark kryo serialized registration failed", e);
         }
+    }
+
+    public void load() throws ExecutionException, InterruptedException {
+        LoadMapping mapping = LoadMapping.of(this.loadOptions.file);
+        List<InputStruct> structs = mapping.structs();
+        boolean sinkType = this.loadOptions.sinkType;
+        if (!sinkType) {
+            this.loadOptions.copyBackendStoreInfo(mapping.getBackendStoreInfo());
+        }
+
+        SparkConf conf = new SparkConf();
+        registerKryoClasses(conf);
         SparkSession session = SparkSession.builder().config(conf).getOrCreate();
         SparkContext sc = session.sparkContext();