You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@hugegraph.apache.org by "simon824 (via GitHub)" <gi...@apache.org> on 2023/09/22 03:05:43 UTC

[GitHub] [incubator-hugegraph-toolchain] simon824 commented on a diff in pull request #497: feat: Support Spark Sink Connector

simon824 commented on code in PR #497:
URL: https://github.com/apache/incubator-hugegraph-toolchain/pull/497#discussion_r1333810278


##########
hugegraph-spark-connector/README.md:
##########
@@ -0,0 +1,53 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with this
+work for additional information regarding copyright ownership. The ASF
+licenses this file to You under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+License for the specific language governing permissions and limitations
+under the License.
+-->
+
+# HugeGraph Spark Connector
+
+## Client Configs
+
+| Params               | Description | Default Value |
+|----------------------|-------------|---------------|
+| `host`               |             |               |
+| `port`               |             |               |
+| `graph`              |             |               |
+| `protocol`           |             |               |
+| `username`           |             |               |
+| `token`              |             |               |
+| `timeout`            |             |               |
+| `max-conn`           |             |               |
+| `max-conn-per-route` |             |               |
+| `trust-store-file`   |             |               |
+| `trust-store-token`  |             |               |
+
+## Graph Data Configs
+
+| Params            | Description | Default Value |
+|-------------------|-------------|---------------|
+| `date-type`       |             |               |
+| `label`           |             |               |
+| `id`              |             |               |
+| `source-name`     |             |               |
+| `target-name`     |             |               |
+| `selected-fields` |             |               |
+| `ignored-fields`  |             |               |
+| `batch-size`      |             |               |
+
+## Common Configs
+
+| Params      | Description | Default Value |
+|-------------|-------------|---------------|
+| `delimiter` |             |               |

Review Comment:
   maybe we can give an example of usage here



##########
hugegraph-spark-connector/src/main/java/org/apache/hugegraph/spark/connector/utils/HGUtils.java:
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.spark.connector.utils;
+
+import java.util.List;
+
+import org.apache.hugegraph.spark.connector.mapping.EdgeMapping;
+import org.apache.hugegraph.spark.connector.mapping.VertexMapping;
+import org.apache.hugegraph.spark.connector.options.HGOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class HGUtils {
+
+    private static final Logger LOG = LoggerFactory.getLogger(HGUtils.class);
+
+    public static VertexMapping vertexMappingFromConf(HGOptions hgOptions) {
+        String idField = hgOptions.idField();
+        VertexMapping vertexMapping = new VertexMapping(idField);
+
+        String label = hgOptions.label();
+        vertexMapping.label(label);
+        vertexMapping.batchSize(hgOptions.batchSize());
+        vertexMapping.selectedFields(hgOptions.selectedFields());
+        vertexMapping.ignoredFields(hgOptions.ignoredFields());
+        vertexMapping.check();
+
+        // TODO mappingFields, mappingValues, nullValues, updateStrategies
+        LOG.info("Update VertexMapping: {}", vertexMapping);
+        return vertexMapping;
+    }
+
+    public static EdgeMapping edgeMappingFromConf(HGOptions hgOptions) {
+        List<String> sourceNames = hgOptions.sourceName();
+        List<String> targetNames = hgOptions.targetName();
+        EdgeMapping edgeMapping = new EdgeMapping(sourceNames, targetNames);
+
+        String label = hgOptions.label();
+        edgeMapping.label(label);
+        edgeMapping.batchSize(hgOptions.batchSize());
+        edgeMapping.selectedFields(hgOptions.selectedFields());
+        edgeMapping.ignoredFields(hgOptions.ignoredFields());
+        edgeMapping.check();
+
+        // TODO mappingFields, mappingValues, nullValues, updateStrategies
+        LOG.info("Update EdgeMapping: {}", edgeMapping);
+        return edgeMapping;
+    }
+
+    //public static List<Vertex> buildVertices(InternalRow row, StructType schema,
+    //                                         VertexBuilder builder) {

Review Comment:
   remove the unused code



##########
hugegraph-spark-connector/src/main/scala/org/apache/hugegraph/spark/connector/utils/HGBuildUtils.scala:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.spark.connector.utils
+
+import org.apache.hugegraph.spark.connector.builder.{EdgeBuilder, VertexBuilder}
+import org.apache.hugegraph.spark.connector.client.HGLoadContext
+import org.apache.hugegraph.structure.graph.{Edge, Vertex}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types.StructType
+import org.slf4j.LoggerFactory
+
+import scala.collection.JavaConverters.{collectionAsScalaIterableConverter, seqAsJavaListConverter}
+
+object HGBuildUtils {
+
+  private val LOG = LoggerFactory.getLogger(this.getClass)
+
+  def buildVertices(row: InternalRow, schema: StructType, builder: VertexBuilder): List[Vertex] = {
+    val fields = schema.names
+    val dataTypes = schema.fields.map(field => field.dataType)
+    val values = for {
+      idx <- schema.fields.indices
+    } yield {
+      val value = row.get(idx, dataTypes(idx))
+      if (value.getClass.getSimpleName.equalsIgnoreCase("UTF8String")) value.toString
+      else value

Review Comment:
   ensure uniform code style and improve readability
   ```suggestion
         if (value.getClass.getSimpleName.equalsIgnoreCase("UTF8String")) {
           value.toString
         } else {
           value
         }
   ```



##########
hugegraph-spark-connector/src/main/resources/log4j2.xml:
##########
@@ -0,0 +1,72 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with this
+  work for additional information regarding copyright ownership. The ASF
+  licenses this file to You under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+  WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+  License for the specific language governing permissions and limitations
+  under the License.
+  -->
+<configuration status="error">
+    <Properties>
+        <property name="log-charset">UTF-8</property>
+    </Properties>
+    <appenders>
+        <Console name="console" target="SYSTEM_OUT">
+            <ThresholdFilter level="INFO" onMatch="ACCEPT" onMismatch="DENY"/>
+            <PatternLayout charset="${log-charset}" pattern="%-d{yyyy-MM-dd HH:mm:ss} [%t] [%-5p] %c{1.} %x - %m%n" />
+        </Console>
+
+        <RollingRandomAccessFile name="file" fileName="logs/hugegraph-spark-connector.log"
+                                 filePattern="logs/hugegraph-spark-connector-%d{yyyy-MM-dd}-%i.log">
+            <ThresholdFilter level="INFO" onMatch="ACCEPT" onMismatch="DENY"/>
+            <PatternLayout charset="${log-charset}" pattern="%-d{yyyy-MM-dd HH:mm:ss} [%t] [%-5p] %c{1.} %x - %m%n" />
+            <!-- Trigger after exceeding 1day or 100MB -->
+            <Policies>
+                <SizeBasedTriggeringPolicy size="100MB"/>
+                <TimeBasedTriggeringPolicy interval="1" modulate="true" />
+            </Policies>
+            <!-- Keep max 5 files per day & auto delete after over 1GB or 100 files -->
+            <DefaultRolloverStrategy max="5">
+                <Delete basePath="logs" maxDepth="2">
+                    <IfFileName glob="*/*.log"/>
+                    <!-- Limit log amount & size -->
+                    <IfAny>
+                        <IfAccumulatedFileSize exceeds="1GB" />
+                        <IfAccumulatedFileCount exceeds="100" />
+                    </IfAny>
+                </Delete>
+            </DefaultRolloverStrategy>
+        </RollingRandomAccessFile>
+    </appenders>
+    <loggers>
+        <root level="INFO">
+            <appender-ref ref="console"/>
+            <appender-ref ref="file"/>
+        </root>
+        <logger name="org.apache.cassandra" level="WARN" additivity="false">
+            <appender-ref ref="file"/>
+        </logger>
+        <logger name="org.apache.hadoop" level="WARN" additivity="false">
+            <appender-ref ref="file"/>
+        </logger>
+        <logger name="io.netty" level="WARN" additivity="false">
+            <appender-ref ref="file"/>
+        </logger>
+        <logger name="org.apache.commons" level="WARN" additivity="false">
+            <appender-ref ref="file"/>
+        </logger>
+        <logger name="org.apache.hugegraph" level="INFO" additivity="false">
+            <appender-ref ref="file"/>
+            <appender-ref ref="console"/>
+        </logger>
+    </loggers>
+</configuration>

Review Comment:
   expect a blank line here



##########
hugegraph-spark-connector/src/main/scala/org/apache/hugegraph/spark/connector/writer/HGVertexWriter.scala:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.spark.connector.writer
+
+import org.apache.hugegraph.spark.connector.builder.VertexBuilder
+import org.apache.hugegraph.spark.connector.client.HGLoadContext
+import org.apache.hugegraph.spark.connector.mapping.VertexMapping
+import org.apache.hugegraph.spark.connector.options.HGOptions
+import org.apache.hugegraph.spark.connector.utils.HGUtils
+import org.apache.hugegraph.spark.connector.utils.HGBuildUtils
+import org.apache.hugegraph.structure.graph.Vertex
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.connector.write.{DataWriter, WriterCommitMessage}
+import org.apache.spark.sql.types.StructType
+import org.slf4j.LoggerFactory
+
+import scala.collection.mutable.ListBuffer
+
+class HGVertexWriter(schema: StructType, hgOptions: HGOptions) extends DataWriter[InternalRow] {
+
+  private val LOG = LoggerFactory.getLogger(this.getClass)
+
+  val context = new HGLoadContext(hgOptions)
+  context.updateSchemaCache()
+  context.setLoadingMode()
+
+  val mapping: VertexMapping = HGUtils.vertexMappingFromConf(hgOptions)
+  val builder = new VertexBuilder(context, mapping)
+
+  private var verticesBuffer: ListBuffer[Vertex] = new ListBuffer()
+
+  var cnt = 0
+
+  override def write(record: InternalRow): Unit = {
+    val vertices = HGBuildUtils.buildVertices(record, schema, builder)
+
+    for (vertex <- vertices) {
+      verticesBuffer.+=(vertex)
+    }
+
+    if (verticesBuffer.size >= hgOptions.batchSize()) {
+      sinkOnce()
+    }
+  }
+
+  private def sinkOnce(): Unit = {
+    verticesBuffer.foreach(e => e.id())
+    LOG.info(s"Writer once: ${verticesBuffer.toList}")
+    val successfulVertices = HGBuildUtils.saveVertices(context, verticesBuffer.toList)
+    val successIds = successfulVertices.map(_.id()).mkString(",")
+    LOG.info(s"Successful ids: ${successIds}")

Review Comment:
   ditto



##########
hugegraph-spark-connector/src/test/java/org/apache/hugegraph/spark/connector/SimpleSinkExample.scala:
##########
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.spark.connector
+
+import org.apache.spark.sql.{SaveMode, SparkSession}
+
+/*
+// Define schema
+schema.propertyKey("name").asText().ifNotExist().create();
+schema.propertyKey("age").asInt().ifNotExist().create();
+schema.propertyKey("city").asText().ifNotExist().create();
+schema.propertyKey("weight").asDouble().ifNotExist().create();
+schema.propertyKey("lang").asText().ifNotExist().create();
+schema.propertyKey("date").asText().ifNotExist().create();
+schema.propertyKey("price").asDouble().ifNotExist().create();
+
+schema.vertexLabel("person")
+        .properties("name", "age", "city")
+//            .primaryKeys("name")
+        .useCustomizeStringId()
+        .nullableKeys("age", "city")
+        .ifNotExist()
+        .create();
+
+schema.vertexLabel("software")
+        .properties("name", "lang", "price")
+        .primaryKeys("name")
+//            .useCustomizeStringId()
+        .ifNotExist()
+        .create();
+
+schema.edgeLabel("knows")
+        .sourceLabel("person")
+        .targetLabel("person")
+        .properties("date", "weight")
+        .ifNotExist()
+        .create();
+
+schema.edgeLabel("created")
+        .sourceLabel("person")
+        .targetLabel("software")
+        .properties("date", "weight")
+        .ifNotExist()
+        .create();
+
+ */
+object SimpleSinkExample {
+
+  // TODO transfer to test
+  def main(args: Array[String]): Unit = {
+    val sparkSession = SparkSession.builder()
+      .master("local[2]")
+      .appName(this.getClass.getSimpleName)
+      .config("spark.ui.port", "19099")
+      .getOrCreate()
+
+    insertVertices1(sparkSession)
+    insertVertices2(sparkSession)
+
+    testInsertEdge1(sparkSession)
+    testInsertEdge2(sparkSession)
+
+    sparkSession.stop()
+  }
+
+  def insertVertices1(sparkSession: SparkSession): Unit = {
+    val df = sparkSession.createDataFrame(Seq(
+      Tuple3("marko", 29, "Beijing"),
+      Tuple3("vadas", 27, "HongKong"),
+      Tuple3("Josh", 32, "Beijing"),
+      Tuple3("peter", 35, "ShangHai"),
+      Tuple3("li,nary", 26, "Wu,han"),
+      Tuple3("Bob", 18, "HangZhou"),
+    )) toDF("name", "age", "city") repartition(2)
+
+    df.show()
+
+    df.write
+      .format("org.apache.hugegraph.spark.connector.DataSource")
+      .option("host", "192.168.34.164")

Review Comment:
   replace with localhost



##########
hugegraph-spark-connector/src/main/scala/org/apache/hugegraph/spark/connector/utils/HGBuildUtils.scala:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.spark.connector.utils
+
+import org.apache.hugegraph.spark.connector.builder.{EdgeBuilder, VertexBuilder}
+import org.apache.hugegraph.spark.connector.client.HGLoadContext
+import org.apache.hugegraph.structure.graph.{Edge, Vertex}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types.StructType
+import org.slf4j.LoggerFactory
+
+import scala.collection.JavaConverters.{collectionAsScalaIterableConverter, seqAsJavaListConverter}
+
+object HGBuildUtils {
+
+  private val LOG = LoggerFactory.getLogger(this.getClass)
+
+  def buildVertices(row: InternalRow, schema: StructType, builder: VertexBuilder): List[Vertex] = {
+    val fields = schema.names
+    val dataTypes = schema.fields.map(field => field.dataType)
+    val values = for {
+      idx <- schema.fields.indices
+    } yield {
+      val value = row.get(idx, dataTypes(idx))
+      if (value.getClass.getSimpleName.equalsIgnoreCase("UTF8String")) value.toString
+      else value
+    }
+    LOG.info(s"Fields: ${fields.mkString(", ")}, values: ${values.mkString(", ")}")
+    builder.build(fields, values.toArray).asScala.toList
+  }
+
+  def buildEdges(row: InternalRow, schema: StructType, builder: EdgeBuilder): List[Edge] = {
+    val fields = schema.names
+    val dataTypes = schema.fields.map(field => field.dataType)
+    val values = for {
+      idx <- schema.fields.indices
+    } yield {
+      val value = row.get(idx, dataTypes(idx))
+      if (value.getClass.getSimpleName.equalsIgnoreCase("UTF8String")) value.toString

Review Comment:
   ditto



##########
hugegraph-spark-connector/pom.xml:
##########
@@ -0,0 +1,189 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with this
+  work for additional information regarding copyright ownership. The ASF
+  licenses this file to You under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+  WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+  License for the specific language governing permissions and limitations
+  under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.hugegraph</groupId>
+        <artifactId>hugegraph-toolchain</artifactId>
+        <version>${revision}</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+
+    <groupId>org.example</groupId>
+    <artifactId>hugegraph-spark-connector</artifactId>
+
+    <properties>
+        <maven.compiler.source>8</maven.compiler.source>
+        <maven.compiler.target>8</maven.compiler.target>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <spark.verion>3.2.2</spark.verion>
+        <spark.scope>provided</spark.scope>
+        <hugegraph.client.version>1.0.0</hugegraph.client.version>
+        <hugegraph.common.version>1.0.0</hugegraph.common.version>
+        <jersey.version>3.0.3</jersey.version>
+        <jackson.version>2.12.3</jackson.version>
+        <jcommander.version>1.78</jcommander.version>
+        <assembly.dir>${project.basedir}/assembly</assembly.dir>
+        <assembly.descriptor.dir>${assembly.dir}/descriptor</assembly.descriptor.dir>
+        <release.name>${project.artifactId}</release.name>
+        <final.name>apache-${release.name}-incubating-${project.version}</final.name>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-core_2.12</artifactId>
+            <version>${spark.verion}</version>
+            <scope>${spark.scope}</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-api</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>jul-to-slf4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>jcl-over-slf4j</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-sql_2.12</artifactId>
+            <version>${spark.verion}</version>
+            <scope>${spark.scope}</scope>
+            <exclusions>
+                <exclusion>
+                    <artifactId>slf4j-api</artifactId>
+                    <groupId>org.slf4j</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hugegraph</groupId>
+            <artifactId>hugegraph-client</artifactId>
+            <version>${hugegraph.client.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.hugegraph</groupId>
+                    <artifactId>hugegraph-common</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hugegraph</groupId>
+            <artifactId>hugegraph-common</artifactId>
+            <version>${hugegraph.common.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>log4j-api</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>log4j-core</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>log4j-slf4j-impl</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <!-- jersey version override -->
+        <dependency>
+            <groupId>org.glassfish.jersey.core</groupId>
+            <artifactId>jersey-client</artifactId>
+            <version>${jersey.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.glassfish.jersey.core</groupId>
+            <artifactId>jersey-common</artifactId>
+            <version>${jersey.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.glassfish.jersey.inject</groupId>
+            <artifactId>jersey-hk2</artifactId>
+            <version>${jersey.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.glassfish.jersey.core</groupId>
+            <artifactId>jersey-server</artifactId>
+            <version>${jersey.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.glassfish.jersey.containers</groupId>
+            <artifactId>jersey-container-servlet-core</artifactId>
+            <version>2.34</version>

Review Comment:
   replace with ${xxx.version}



##########
hugegraph-spark-connector/pom.xml:
##########
@@ -0,0 +1,189 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with this
+  work for additional information regarding copyright ownership. The ASF
+  licenses this file to You under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+  WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+  License for the specific language governing permissions and limitations
+  under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.hugegraph</groupId>
+        <artifactId>hugegraph-toolchain</artifactId>
+        <version>${revision}</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+
+    <groupId>org.example</groupId>
+    <artifactId>hugegraph-spark-connector</artifactId>
+
+    <properties>
+        <maven.compiler.source>8</maven.compiler.source>
+        <maven.compiler.target>8</maven.compiler.target>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <spark.verion>3.2.2</spark.verion>
+        <spark.scope>provided</spark.scope>
+        <hugegraph.client.version>1.0.0</hugegraph.client.version>
+        <hugegraph.common.version>1.0.0</hugegraph.common.version>
+        <jersey.version>3.0.3</jersey.version>
+        <jackson.version>2.12.3</jackson.version>
+        <jcommander.version>1.78</jcommander.version>
+        <assembly.dir>${project.basedir}/assembly</assembly.dir>
+        <assembly.descriptor.dir>${assembly.dir}/descriptor</assembly.descriptor.dir>
+        <release.name>${project.artifactId}</release.name>
+        <final.name>apache-${release.name}-incubating-${project.version}</final.name>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-core_2.12</artifactId>
+            <version>${spark.verion}</version>
+            <scope>${spark.scope}</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-api</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>jul-to-slf4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>jcl-over-slf4j</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-sql_2.12</artifactId>
+            <version>${spark.verion}</version>
+            <scope>${spark.scope}</scope>
+            <exclusions>
+                <exclusion>
+                    <artifactId>slf4j-api</artifactId>
+                    <groupId>org.slf4j</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hugegraph</groupId>
+            <artifactId>hugegraph-client</artifactId>
+            <version>${hugegraph.client.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.hugegraph</groupId>
+                    <artifactId>hugegraph-common</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hugegraph</groupId>
+            <artifactId>hugegraph-common</artifactId>
+            <version>${hugegraph.common.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>log4j-api</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>log4j-core</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>log4j-slf4j-impl</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <!-- jersey version override -->
+        <dependency>
+            <groupId>org.glassfish.jersey.core</groupId>
+            <artifactId>jersey-client</artifactId>
+            <version>${jersey.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.glassfish.jersey.core</groupId>
+            <artifactId>jersey-common</artifactId>
+            <version>${jersey.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.glassfish.jersey.inject</groupId>
+            <artifactId>jersey-hk2</artifactId>
+            <version>${jersey.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.glassfish.jersey.core</groupId>
+            <artifactId>jersey-server</artifactId>
+            <version>${jersey.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.glassfish.jersey.containers</groupId>
+            <artifactId>jersey-container-servlet-core</artifactId>
+            <version>2.34</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <!-- jackson version override -->
+        <dependency>
+            <groupId>com.fasterxml.jackson.jaxrs</groupId>
+            <artifactId>jackson-jaxrs-base</artifactId>
+            <version>${jackson.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.jaxrs</groupId>
+            <artifactId>jackson-jaxrs-json-provider</artifactId>
+            <version>${jackson.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.module</groupId>
+            <artifactId>jackson-module-jaxb-annotations</artifactId>
+            <version>${jackson.version}</version>
+        </dependency>
+
+        <!-- logging version override -->
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+            <version>1.7.25</version>

Review Comment:
   ditto



##########
hugegraph-spark-connector/src/main/java/org/apache/hugegraph/spark/connector/builder/EdgeBuilder.java:
##########
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.spark.connector.builder;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+import org.apache.hugegraph.spark.connector.client.HGLoadContext;
+import org.apache.hugegraph.spark.connector.mapping.EdgeMapping;
+import org.apache.hugegraph.structure.graph.Edge;
+import org.apache.hugegraph.structure.graph.Vertex;
+import org.apache.hugegraph.structure.schema.EdgeLabel;
+import org.apache.hugegraph.structure.schema.SchemaLabel;
+import org.apache.hugegraph.structure.schema.VertexLabel;
+import org.apache.hugegraph.util.E;
+
+import com.google.common.collect.ImmutableList;
+
+public class EdgeBuilder extends ElementBuilder<Edge> {
+

Review Comment:
   Can we reuse these classes (Builder\Mapping..) by importing the hugegraph-loader module?



##########
hugegraph-spark-connector/src/main/scala/org/apache/hugegraph/spark/connector/writer/HGEdgeWriter.scala:
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.spark.connector.writer
+
+import org.apache.hugegraph.spark.connector.builder.EdgeBuilder
+import org.apache.hugegraph.spark.connector.client.HGLoadContext
+import org.apache.hugegraph.spark.connector.mapping.EdgeMapping
+import org.apache.hugegraph.spark.connector.options.HGOptions
+import org.apache.hugegraph.spark.connector.utils.HGUtils
+import org.apache.hugegraph.spark.connector.utils.HGBuildUtils
+import org.apache.hugegraph.structure.graph.Edge
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.connector.write.{DataWriter, WriterCommitMessage}
+import org.apache.spark.sql.types.StructType
+import org.slf4j.LoggerFactory
+
+import scala.collection.mutable.ListBuffer
+
+class HGEdgeWriter(schema: StructType, hgOptions: HGOptions) extends DataWriter[InternalRow] {
+
+  private val LOG = LoggerFactory.getLogger(this.getClass)
+
+  val context = new HGLoadContext(hgOptions)
+  context.updateSchemaCache()
+  context.setLoadingMode()
+
+  val mapping: EdgeMapping = HGUtils.edgeMappingFromConf(hgOptions)
+  val builder = new EdgeBuilder(context, mapping)
+
+  private var edgesBuffer: ListBuffer[Edge] = new ListBuffer()
+
+  var cnt = 0
+
+  override def write(record: InternalRow): Unit = {
+    val edges = HGBuildUtils.buildEdges(record, schema, builder)
+
+    for (edge <- edges) {
+      edgesBuffer.+=(edge)
+    }
+
+    if (edgesBuffer.size >= hgOptions.batchSize()) {
+      sinkOnce()
+    }
+  }
+
+  private def sinkOnce(): Unit = {
+    LOG.info(s"Writer once: ${edgesBuffer.toList}")
+    val successfulVertices = HGBuildUtils.saveEdges(context, edgesBuffer.toList)
+    val successIds = successfulVertices.map(v => v.id())
+    LOG.info(s"Successful ids: ${successIds}")

Review Comment:
   We don't need to print all ids, the amount maybe very large



##########
hugegraph-spark-connector/src/main/scala/org/apache/hugegraph/spark/connector/writer/HGBatchWriterFactory.scala:
##########
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.spark.connector.writer
+
+import org.apache.hugegraph.spark.connector.constant.DataTypeEnum
+import org.apache.hugegraph.spark.connector.options.HGOptions
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.connector.write.{DataWriter, DataWriterFactory}
+import org.apache.spark.sql.types.StructType
+import org.slf4j.LoggerFactory
+
+class HGBatchWriterFactory(schema: StructType, hgOptions: HGOptions) extends DataWriterFactory {
+
+  private val LOG = LoggerFactory.getLogger(this.getClass)
+
+  override def createWriter(partitionId: Int, taskId: Long): DataWriter[InternalRow] = {
+    val dataType = hgOptions.dataType()
+    LOG.info(s"Create a ${dataType} writer, partitionId: ${partitionId}, taskId: ${taskId}")
+    if (dataType == "vertex") new HGVertexWriter(schema, hgOptions)
+    else new HGEdgeWriter(schema, hgOptions)

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@hugegraph.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@hugegraph.apache.org
For additional commands, e-mail: issues-help@hugegraph.apache.org