You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hugegraph.apache.org by ji...@apache.org on 2023/10/16 08:32:53 UTC
[incubator-hugegraph-toolchain] branch master updated: feat(spark): support spark-sink connector for loader (#497)
This is an automated email from the ASF dual-hosted git repository.
jin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hugegraph-toolchain.git
The following commit(s) were added to refs/heads/master by this push:
new 71e623d3 feat(spark): support spark-sink connector for loader (#497)
71e623d3 is described below
commit 71e623d369db00a8f1f82e2ab3f5170daee6dbfa
Author: Liu Xiao <42...@users.noreply.github.com>
AuthorDate: Mon Oct 16 16:32:47 2023 +0800
feat(spark): support spark-sink connector for loader (#497)
done for V1
---------
Co-authored-by: imbajin <ji...@apache.org>
---
.github/workflows/spark-connector-ci.yml | 57 +++
.../scripts/dependency/known-dependencies.txt | 19 +-
hugegraph-spark-connector/README.md | 193 ++++++++++
.../assembly/travis/install-hugegraph-from-tar.sh | 35 ++
hugegraph-spark-connector/pom.xml | 257 +++++++++++++
.../spark/connector/builder/EdgeBuilder.java | 200 ++++++++++
.../spark/connector/builder/ElementBuilder.java | 403 +++++++++++++++++++++
.../spark/connector/builder/VertexBuilder.java | 83 +++++
.../spark/connector/client/HGClientHolder.java | 111 ++++++
.../spark/connector/client/HGLoadContext.java | 94 +++++
.../spark/connector/client/SchemaCache.java | 113 ++++++
.../spark/connector/constant/Constants.java | 71 ++++
.../spark/connector/constant/DataTypeEnum.java | 47 +++
.../spark/connector/exception/LoadException.java | 39 ++
.../spark/connector/mapping/EdgeMapping.java | 66 ++++
.../spark/connector/mapping/ElementMapping.java | 143 ++++++++
.../spark/connector/mapping/VertexMapping.java | 51 +++
.../spark/connector/options/HGOptions.java | 266 ++++++++++++++
.../spark/connector/utils/DataTypeUtils.java | 215 +++++++++++
.../hugegraph/spark/connector/utils/DateUtils.java | 48 +++
.../hugegraph/spark/connector/utils/HGUtils.java | 64 ++++
.../src/main/resources/log4j2.xml | 72 ++++
.../hugegraph/spark/connector/DataSource.scala | 55 +++
.../apache/hugegraph/spark/connector/HGTable.scala | 54 +++
.../spark/connector/utils/HGBuildUtils.scala | 78 ++++
.../spark/connector/writer/HGBatchWriter.scala | 43 +++
.../connector/writer/HGBatchWriterFactory.scala | 41 +++
.../spark/connector/writer/HGCommitMessage.scala | 22 ++
.../spark/connector/writer/HGEdgeWriter.scala | 85 +++++
.../spark/connector/writer/HGVertexWriter.scala | 86 +++++
.../spark/connector/writer/HGWriterBuilder.scala | 38 ++
.../spark/connector/builder/EdgeBuilderTest.java | 105 ++++++
.../spark/connector/builder/VertexBuilderTest.java | 177 +++++++++
.../spark/connector/client/HGClientHolderTest.java | 45 +++
.../spark/connector/mapping/EdgeMappingTest.java | 184 ++++++++++
.../spark/connector/mapping/VertexMappingTest.java | 133 +++++++
.../spark/connector/options/HGOptionsTest.java | 156 ++++++++
.../spark/connector/utils/DataTypeUtilsTest.java | 162 +++++++++
.../spark/connector/utils/HGEnvUtils.java | 84 +++++
.../spark/connector/utils/HGUtilsTest.java | 75 ++++
.../spark/connector/SinkExampleTest.scala | 177 +++++++++
pom.xml | 1 +
42 files changed, 4439 insertions(+), 9 deletions(-)
diff --git a/.github/workflows/spark-connector-ci.yml b/.github/workflows/spark-connector-ci.yml
new file mode 100644
index 00000000..efa4639b
--- /dev/null
+++ b/.github/workflows/spark-connector-ci.yml
@@ -0,0 +1,57 @@
+name: "hugegraph-spark-connector-ci"
+
+on:
+ push:
+ branches:
+ - master
+ - /^release-.*$/
+ paths:
+ - hugegraph-spark-connector/**
+ - hugegraph-dist/**
+ - .github/workflows/**
+ - pom.xml
+ pull_request:
+ paths:
+ - hugegraph-spark-connector/**
+ - hugegraph-dist/**
+ - hugegraph-client/**
+ - .github/workflows/**
+ - pom.xml
+
+jobs:
+ spark-connector-ci:
+ runs-on: ubuntu-latest
+ env:
+ TRAVIS_DIR: hugegraph-spark-connector/assembly/travis
+ VERSION_ID: 1.0.0
+ steps:
+ - name: Install JDK 11
+ uses: actions/setup-java@v3
+ with:
+ java-version: '11'
+ distribution: 'adopt'
+
+ - name: Cache Maven packages
+ uses: actions/cache@v3
+ with:
+ path: ~/.m2
+ key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }}
+ restore-keys: ${{ runner.os }}-m2
+
+ - name: Checkout
+ uses: actions/checkout@v4
+ with:
+ fetch-depth: 2
+
+ - name: Compile
+ run: |
+ mvn install -pl hugegraph-spark-connector -Dmaven.javadoc.skip=true -DskipTests -ntp
+
+ - name: Prepare env and service
+ run: |
+ $TRAVIS_DIR/install-hugegraph-from-tar.sh $VERSION_ID
+
+ - name: Run test
+ run: |
+ cd hugegraph-spark-connector && ls
+ mvn test
diff --git a/hugegraph-dist/scripts/dependency/known-dependencies.txt b/hugegraph-dist/scripts/dependency/known-dependencies.txt
index 6a67bf29..bacffaed 100644
--- a/hugegraph-dist/scripts/dependency/known-dependencies.txt
+++ b/hugegraph-dist/scripts/dependency/known-dependencies.txt
@@ -1,7 +1,3 @@
-HdrHistogram-2.1.9.jar
-HikariCP-3.2.0.jar
-LatencyUtils-2.0.3.jar
-ST4-4.0.4.jar
accessors-smart-1.2.jar
accessors-smart-2.4.2.jar
aircompressor-0.10.jar
@@ -79,8 +75,8 @@ hadoop-yarn-client-3.3.1.jar
hadoop-yarn-common-3.3.1.jar
hbase-client-2.2.3.jar
hbase-common-2.2.3.jar
-hbase-hadoop-compat-2.2.3.jar
hbase-hadoop2-compat-2.2.3.jar
+hbase-hadoop-compat-2.2.3.jar
hbase-http-2.2.3.jar
hbase-mapreduce-2.2.3.jar
hbase-metrics-2.2.3.jar
@@ -95,7 +91,9 @@ hbase-shaded-miscellaneous-2.2.1.jar
hbase-shaded-netty-2.2.1.jar
hbase-shaded-protobuf-2.2.1.jar
hbase-zookeeper-2.2.3.jar
+HdrHistogram-2.1.9.jar
hibernate-validator-6.0.17.Final.jar
+HikariCP-3.2.0.jar
hive-classification-3.1.3.jar
hive-common-3.1.3.jar
hive-exec-3.1.3.jar
@@ -146,16 +144,17 @@ jakarta.xml.bind-api-2.3.2.jar
jakarta.xml.bind-api-4.0.0-RC2.jar
jamon-runtime-2.4.1.jar
javassist-3.24.0-GA.jar
+javassist-3.25.0-GA.jar
javassist-3.28.0-GA.jar
javax.activation-api-1.2.0.jar
javax.annotation-api-1.3.2.jar
javax.el-3.0.0.jar
javax.el-3.0.1-b12.jar
javax.json-1.0.jar
-javax.servlet-api-3.1.0.jar
-javax.servlet-api-4.0.1.jar
javax.servlet.jsp-2.3.2.jar
javax.servlet.jsp-api-2.3.1.jar
+javax.servlet-api-3.1.0.jar
+javax.servlet-api-4.0.1.jar
javolution-5.5.1.jar
jaxb-api-2.2.11.jar
jaxb-api-2.3.1.jar
@@ -227,13 +226,14 @@ kerby-config-1.0.1.jar
kerby-pkix-1.0.1.jar
kerby-util-1.0.1.jar
kerby-xdr-1.0.1.jar
+LatencyUtils-2.0.3.jar
leveldbjni-all-1.8.jar
libthrift-0.9.3.jar
lightning-csv-8.2.1.jar
listenablefuture-9999.0-empty-to-avoid-conflict-with-guava.jar
+log4j-1.2.17.jar
log4j-1.2-api-2.11.2.jar
log4j-1.2-api-2.17.1.jar
-log4j-1.2.17.jar
log4j-api-2.18.0.jar
log4j-core-2.18.0.jar
log4j-jul-2.11.2.jar
@@ -316,8 +316,9 @@ spring-jdbc-5.1.9.RELEASE.jar
spring-tx-5.1.9.RELEASE.jar
spring-web-5.1.9.RELEASE.jar
spring-webmvc-5.1.9.RELEASE.jar
-stax-api-1.0.1.jar
+ST4-4.0.4.jar
stax2-api-4.2.1.jar
+stax-api-1.0.1.jar
threeten-extra-1.5.0.jar
token-provider-1.0.1.jar
tomcat-embed-core-9.0.24.jar
diff --git a/hugegraph-spark-connector/README.md b/hugegraph-spark-connector/README.md
new file mode 100644
index 00000000..4cfc3b2c
--- /dev/null
+++ b/hugegraph-spark-connector/README.md
@@ -0,0 +1,193 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with this
+work for additional information regarding copyright ownership. The ASF
+licenses this file to You under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+License for the specific language governing permissions and limitations
+under the License.
+-->
+
+# HugeGraph Spark Connector
+
+[![License](https://img.shields.io/badge/license-Apache%202-0E78BA.svg)](https://www.apache.org/licenses/LICENSE-2.0.html)
+
+HugeGraph Spark Connector is a Spark connector application for reading and writing HugeGraph data in Spark standard format.
+
+## Building
+
+Required:
+
+- Java 8+
+- Maven 3.6+
+
+To build without executing tests:
+
+```bash
+mvn clean package -DskipTests
+```
+
+To build with default tests:
+
+```bash
+mvn clean packge
+```
+
+## How to use
+
+If we have a graph, the schema is defined as follows:
+
+### Schema
+
+```groovy
+schema.propertyKey("name").asText().ifNotExist().create()
+schema.propertyKey("age").asInt().ifNotExist().create()
+schema.propertyKey("city").asText().ifNotExist().create()
+schema.propertyKey("weight").asDouble().ifNotExist().create()
+schema.propertyKey("lang").asText().ifNotExist().create()
+schema.propertyKey("date").asText().ifNotExist().create()
+schema.propertyKey("price").asDouble().ifNotExist().create()
+
+schema.vertexLabel("person")
+ .properties("name", "age", "city")
+ .useCustomizeStringId()
+ .nullableKeys("age", "city")
+ .ifNotExist()
+ .create()
+
+schema.vertexLabel("software")
+ .properties("name", "lang", "price")
+ .primaryKeys("name")
+ .ifNotExist()
+ .create()
+
+schema.edgeLabel("knows")
+ .sourceLabel("person")
+ .targetLabel("person")
+ .properties("date", "weight")
+ .ifNotExist()
+ .create()
+
+schema.edgeLabel("created")
+ .sourceLabel("person")
+ .targetLabel("software")
+ .properties("date", "weight")
+ .ifNotExist()
+ .create()
+```
+
+Then we can insert graph data through Spark, first add dependency in your pom.
+
+```xml
+<dependency>
+ <groupId>org.apache.hugegraph</groupId>
+ <artifactId>hugegraph-spark-connector</artifactId>
+ <version>${revision}</version>
+</dependency>
+```
+
+### Vertex Sink
+
+```scala
+val df = sparkSession.createDataFrame(Seq(
+ Tuple3("marko", 29, "Beijing"),
+ Tuple3("vadas", 27, "HongKong"),
+ Tuple3("Josh", 32, "Beijing"),
+ Tuple3("peter", 35, "ShangHai"),
+ Tuple3("li,nary", 26, "Wu,han"),
+ Tuple3("Bob", 18, "HangZhou"),
+)) toDF("name", "age", "city")
+
+df.show()
+
+df.write
+ .format("org.apache.hugegraph.spark.connector.DataSource")
+ .option("host", "127.0.0.1")
+ .option("port", "8080")
+ .option("graph", "hugegraph")
+ .option("data-type", "vertex")
+ .option("label", "person")
+ .option("id", "name")
+ .option("batch-size", 2)
+ .mode(SaveMode.Overwrite)
+ .save()
+```
+
+### Edge Sink
+
+```scala
+val df = sparkSession.createDataFrame(Seq(
+ Tuple4("marko", "vadas", "20160110", 0.5),
+ Tuple4("peter", "Josh", "20230801", 1.0),
+ Tuple4("peter", "li,nary", "20130220", 2.0)
+)).toDF("source", "target", "date", "weight")
+
+df.show()
+
+df.write
+ .format("org.apache.hugegraph.spark.connector.DataSource")
+ .option("host", "127.0.0.1")
+ .option("port", "8080")
+ .option("graph", "hugegraph")
+ .option("data-type", "edge")
+ .option("label", "knows")
+ .option("source-name", "source")
+ .option("target-name", "target")
+ .option("batch-size", 2)
+ .mode(SaveMode.Overwrite)
+ .save()
+```
+
+### Configs
+
+Client Configs are used to configure hugegraph-client.
+
+#### Client Configs
+
+| Params | Default Value | Description |
+|----------------------|---------------|----------------------------------------------------------------------------------------------|
+| `host` | `localhost` | Address of HugeGraphServer |
+| `port` | `8080` | Port of HugeGraphServer |
+| `graph` | `hugegraph` | Graph space name |
+| `protocol` | `http` | Protocol for sending requests to the server, optional `http` or `https` |
+| `username` | `null` | Username of the current graph when HugeGraphServer enables permission authentication |
+| `token` | `null` | Token of the current graph when HugeGraphServer has enabled authorization authentication |
+| `timeout` | `60` | Timeout (seconds) for inserting results to return |
+| `max-conn` | `CPUS * 4` | The maximum number of HTTP connections between HugeClient and HugeGraphServer |
+| `max-conn-per-route` | `CPUS * 2` | The maximum number of HTTP connections for each route between HugeClient and HugeGraphServer |
+| `trust-store-file` | `null` | The client’s certificate file path when the request protocol is https |
+| `trust-store-token` | `null` | The client's certificate password when the request protocol is https |
+
+##### Graph Data Configs
+
+Graph Data Configs are used to set graph space configuration.
+
+| Params | Default Value | Description |
+|-------------------|---------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| `date-type` | | Graph data type, must be `vertex` or `edge` |
+| `label` | | Label to which the vertex/edge data to be imported belongs |
+| `id` | | Specify a column as the id column of the vertex. When the vertex id policy is CUSTOMIZE, it is required; when the id policy is PRIMARY_KEY, it must be empty |
+| `source-name` | | Select certain columns of the input source as the id column of source vertex. When the id policy of the source vertex is CUSTOMIZE, a certain column must be specified as the id column of the vertex; when the id policy of the source vertex is When PRIMARY_KEY, one or more columns must be specified for splicing the id of the generated vertex, that is, no matter which id strategy is used, this item is required |
+| `target-name` | | Specify certain columns as the id columns of target vertex, similar to source |
+| `selected-fields` | | Select some columns to insert, other unselected ones are not inserted, cannot exist at the same time as ignored |
+| `ignored-fields` | | Ignore some columns so that they do not participate in insertion, cannot exist at the same time as selected |
+| `batch-size` | `500` | The number of data items in each batch when importing data |
+
+#### Common Configs
+
+Common Configs contains some common configurations.
+
+| Params | Default Value | Description |
+|-------------|---------------|---------------------------------------------------------------------------------|
+| `delimiter` | `,` | Separator of `source-name`, `target-name`, `selected-fields` or `ignore-fields` |
+
+## Licence
+
+The same as HugeGraph, hugegraph-spark-connector is also licensed under Apache 2.0 License.
\ No newline at end of file
diff --git a/hugegraph-spark-connector/assembly/travis/install-hugegraph-from-tar.sh b/hugegraph-spark-connector/assembly/travis/install-hugegraph-from-tar.sh
new file mode 100755
index 00000000..00f388bf
--- /dev/null
+++ b/hugegraph-spark-connector/assembly/travis/install-hugegraph-from-tar.sh
@@ -0,0 +1,35 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with this
+# work for additional information regarding copyright ownership. The ASF
+# licenses this file to You under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+set -ev
+
+if [[ $# -ne 1 ]]; then
+ echo "Must input an existing commit id of hugegraph server" && exit 1
+fi
+
+VERSION=$1
+HUGEGRAPH_LINK="https://downloads.apache.org/incubator/hugegraph/${VERSION}/apache-hugegraph-incubating-${VERSION}.tar.gz"
+
+wget "${HUGEGRAPH_LINK}" -q --show-progress || exit 1
+tar zxvf apache-hugegraph-incubating-${VERSION}.tar.gz
+cd apache-hugegraph-incubating-${VERSION}
+
+## start HugeGraphServer with http protocol
+bin/init-store.sh
+bin/start-hugegraph.sh
+
+cd ../
diff --git a/hugegraph-spark-connector/pom.xml b/hugegraph-spark-connector/pom.xml
new file mode 100644
index 00000000..5f595f07
--- /dev/null
+++ b/hugegraph-spark-connector/pom.xml
@@ -0,0 +1,257 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with this
+ work for additional information regarding copyright ownership. The ASF
+ licenses this file to You under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ License for the specific language governing permissions and limitations
+ under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.hugegraph</groupId>
+ <artifactId>hugegraph-toolchain</artifactId>
+ <version>${revision}</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>hugegraph-spark-connector</artifactId>
+ <packaging>jar</packaging>
+ <name>${project.artifactId}</name>
+
+ <properties>
+ <maven.compiler.source>8</maven.compiler.source>
+ <maven.compiler.target>8</maven.compiler.target>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <spark.verion>3.2.2</spark.verion>
+ <spark.scope>provided</spark.scope>
+ <hugegraph.client.version>1.0.0</hugegraph.client.version>
+ <jackson.version>2.12.3</jackson.version>
+ <jersey.version>3.0.3</jersey.version>
+ <jersey.container.servlet.core.version>2.34</jersey.container.servlet.core.version>
+ <log4j.version>2.18.0</log4j.version>
+ <slf4j.api.version>1.7.25</slf4j.api.version>
+ <scala.minor.version>2.12.11</scala.minor.version>
+ <assembly.dir>${project.basedir}/assembly</assembly.dir>
+ <assembly.descriptor.dir>${assembly.dir}/descriptor</assembly.descriptor.dir>
+ <release.name>${project.artifactId}</release.name>
+ <final.name>apache-${release.name}-incubating-${project.version}</final.name>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core_${scala.version}</artifactId>
+ <version>${spark.verion}</version>
+ <scope>${spark.scope}</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>jul-to-slf4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>jcl-over-slf4j</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-sql_${scala.version}</artifactId>
+ <version>${spark.verion}</version>
+ <scope>${spark.scope}</scope>
+ <exclusions>
+ <exclusion>
+ <artifactId>slf4j-api</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hugegraph</groupId>
+ <artifactId>hugegraph-client</artifactId>
+ <version>${hugegraph.client.version}</version>
+ </dependency>
+
+ <!-- jersey version override -->
+ <dependency>
+ <groupId>org.glassfish.jersey.core</groupId>
+ <artifactId>jersey-client</artifactId>
+ <version>${jersey.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.glassfish.jersey.core</groupId>
+ <artifactId>jersey-common</artifactId>
+ <version>${jersey.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.glassfish.jersey.inject</groupId>
+ <artifactId>jersey-hk2</artifactId>
+ <version>${jersey.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.glassfish.jersey.core</groupId>
+ <artifactId>jersey-server</artifactId>
+ <version>${jersey.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.glassfish.jersey.containers</groupId>
+ <artifactId>jersey-container-servlet-core</artifactId>
+ <version>${jersey.container.servlet.core.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <!-- jackson version override -->
+ <dependency>
+ <groupId>com.fasterxml.jackson.jaxrs</groupId>
+ <artifactId>jackson-jaxrs-base</artifactId>
+ <version>${jackson.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.jaxrs</groupId>
+ <artifactId>jackson-jaxrs-json-provider</artifactId>
+ <version>${jackson.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.module</groupId>
+ <artifactId>jackson-module-jaxb-annotations</artifactId>
+ <version>${jackson.version}</version>
+ </dependency>
+
+ <!-- logging version override -->
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>${slf4j.api.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-api</artifactId>
+ <version>${log4j.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-core</artifactId>
+ <version>${log4j.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-slf4j-impl</artifactId>
+ <version>${log4j.version}</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <source>1.8</source>
+ <target>1.8</target>
+ </configuration>
+ <executions>
+ <execution>
+ <phase>compile</phase>
+ <goals>
+ <goal>compile</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.scala-tools</groupId>
+ <artifactId>maven-scala-plugin</artifactId>
+ <version>2.15.2</version>
+ <configuration>
+ <scalaVersion>${scala.minor.version}</scalaVersion>
+ <args>
+ <arg>-target:jvm-1.8</arg>
+ </args>
+ <jvmArgs>
+ <jvmArg>-Xss4096K</jvmArg>
+ </jvmArgs>
+ </configuration>
+ <executions>
+ <execution>
+ <id>scala-compile</id>
+ <goals>
+ <goal>compile</goal>
+ </goals>
+ <configuration>
+ <excludes>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ </excludes>
+ </configuration>
+ </execution>
+ <execution>
+ <id>scala-test-compile</id>
+ <goals>
+ <goal>testCompile</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>2.5.3</version>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <descriptorRefs>
+ <descriptorRef>jar-with-dependencies</descriptorRef>
+ </descriptorRefs>
+ <finalName>
+ ${project.artifactId}-${project.version}-jar-with-dependencies
+ </finalName>
+ <appendAssemblyId>false</appendAssemblyId>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-source-plugin</artifactId>
+ <version>3.2.0</version>
+ <executions>
+ <execution>
+ <id>attach-sources</id>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
diff --git a/hugegraph-spark-connector/src/main/java/org/apache/hugegraph/spark/connector/builder/EdgeBuilder.java b/hugegraph-spark-connector/src/main/java/org/apache/hugegraph/spark/connector/builder/EdgeBuilder.java
new file mode 100644
index 00000000..9de225f8
--- /dev/null
+++ b/hugegraph-spark-connector/src/main/java/org/apache/hugegraph/spark/connector/builder/EdgeBuilder.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.spark.connector.builder;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+import org.apache.hugegraph.spark.connector.client.HGLoadContext;
+import org.apache.hugegraph.spark.connector.mapping.EdgeMapping;
+import org.apache.hugegraph.structure.graph.Edge;
+import org.apache.hugegraph.structure.graph.Vertex;
+import org.apache.hugegraph.structure.schema.EdgeLabel;
+import org.apache.hugegraph.structure.schema.SchemaLabel;
+import org.apache.hugegraph.structure.schema.VertexLabel;
+import org.apache.hugegraph.util.E;
+
+import com.google.common.collect.ImmutableList;
+
+public class EdgeBuilder extends ElementBuilder<Edge> {
+
+ private final EdgeMapping mapping;
+
+ private final EdgeLabel edgeLabel;
+
+ private final VertexLabel sourceLabel;
+
+ private final VertexLabel targetLabel;
+
+ // Used to optimize access performance
+ private VertexIdsIndex vertexIdsIndex;
+
+ public EdgeBuilder(HGLoadContext context, EdgeMapping mapping) {
+ super(context);
+ this.mapping = mapping;
+ this.edgeLabel = this.getEdgeLabel(this.mapping.label());
+ this.sourceLabel = this.getVertexLabel(this.edgeLabel.sourceLabel());
+ this.targetLabel = this.getVertexLabel(this.edgeLabel.targetLabel());
+ // Ensure that the source/target id fields are matched with id strategy
+ this.checkIdFields(this.sourceLabel, this.mapping.sourceFields());
+ this.checkIdFields(this.targetLabel, this.mapping.targetFields());
+
+ this.vertexIdsIndex = null;
+ }
+
+ @Override
+ public EdgeMapping mapping() {
+ return this.mapping;
+ }
+
+ @Override
+ public SchemaLabel schemaLabel() {
+ return this.edgeLabel;
+ }
+
+ @Override
+ protected boolean isIdField(String fieldName) {
+ return this.mapping.sourceFields().contains(fieldName) ||
+ this.mapping.targetFields().contains(fieldName);
+ }
+
+ @Override
+ public List<Edge> build(String[] names, Object[] values) {
+ if (Objects.isNull(this.vertexIdsIndex)) {
+ this.vertexIdsIndex = this.extractVertexIdsIndex(names);
+ }
+ EdgeKVPairs kvPairs = this.newEdgeKVPairs();
+ // get margin vertex's idField and idValue (id or pk strategy)
+ kvPairs.source.extractFromEdge(names, values, this.vertexIdsIndex.sourceIndexes);
+ kvPairs.target.extractFromEdge(names, values, this.vertexIdsIndex.targetIndexes);
+ kvPairs.extractProperties(names, values);
+
+ List<Vertex> sources = kvPairs.source.buildVertices(false, true);
+ List<Vertex> targets = kvPairs.target.buildVertices(false, true);
+ if (sources.isEmpty() || targets.isEmpty()) {
+ return ImmutableList.of();
+ }
+ E.checkArgument(sources.size() == 1 || targets.size() == 1 ||
+ sources.size() == targets.size(),
+ "The elements number of source and target must be: " +
+ "1 to n, n to 1, n to n");
+ int size = Math.max(sources.size(), targets.size());
+ List<Edge> edges = new ArrayList<>(size);
+ for (int i = 0; i < size; i++) {
+ Vertex source = i < sources.size() ? sources.get(i) : sources.get(0);
+ Vertex target = i < targets.size() ? targets.get(i) : targets.get(0);
+ Edge edge = new Edge(this.mapping.label());
+ edge.source(source);
+ edge.target(target);
+ this.addProperties(edge, kvPairs.properties);
+ edges.add(edge);
+ }
+ return edges;
+ }
+
+ private EdgeKVPairs newEdgeKVPairs() {
+ EdgeKVPairs kvPairs = new EdgeKVPairs();
+ kvPairs.source = this.newKVPairs(this.sourceLabel);
+ kvPairs.target = this.newKVPairs(this.targetLabel);
+ return kvPairs;
+ }
+
+ private void checkIdFields(VertexLabel vertexLabel, List<String> fields) {
+ if (vertexLabel.idStrategy().isCustomize()) {
+ E.checkArgument(fields.size() == 1,
+ "The source/target field can contains only one " +
+ "column when id strategy is CUSTOMIZE");
+ } else if (vertexLabel.idStrategy().isPrimaryKey()) {
+ E.checkArgument(fields.size() >= 1,
+ "The source/target field must contains some " +
+ "columns when id strategy is PrimaryKey");
+ } else {
+ throw new IllegalArgumentException("Unsupported AUTOMATIC id strategy " +
+ "for hugegraph Spark Connector.");
+ }
+ }
+
+ public class EdgeKVPairs {
+
+ // No general properties
+ private VertexKVPairs source;
+
+ private VertexKVPairs target;
+
+ // General properties
+ private Map<String, Object> properties;
+
+ public void extractProperties(String[] names, Object[] values) {
+ // General properties
+ this.properties = new HashMap<>();
+ Set<String> props = schemaLabel().properties();
+ for (int i = 0; i < names.length; i++) {
+ String fieldName = names[i];
+ Object fieldValue = values[i];
+ if (!retainField(fieldName, fieldValue)) {
+ continue;
+ }
+
+ String key = mapping.mappingField(fieldName);
+ if (isIdField(fieldName) && !props.contains(fieldName) && !props.contains(key)) {
+ continue;
+ }
+
+ Object value = mappingValue(fieldName, fieldValue);
+ this.properties.put(key, value);
+ }
+ }
+ }
+
+ private VertexIdsIndex extractVertexIdsIndex(String[] names) {
+ VertexIdsIndex index = new VertexIdsIndex();
+ index.sourceIndexes = new int[this.mapping.sourceFields().size()];
+ int idx = 0;
+ for (String field : this.mapping.sourceFields()) {
+ for (int pos = 0; pos < names.length; pos++) {
+ String name = names[pos];
+ if (field.equals(name)) {
+ index.sourceIndexes[idx++] = pos;
+ }
+ }
+ }
+
+ index.targetIndexes = new int[this.mapping.targetFields().size()];
+ idx = 0;
+ for (String field : this.mapping.targetFields()) {
+ for (int pos = 0; pos < names.length; pos++) {
+ String name = names[pos];
+ if (field.equals(name)) {
+ index.targetIndexes[idx++] = pos;
+ }
+ }
+ }
+ return index;
+ }
+
+ private static class VertexIdsIndex {
+
+ private int[] sourceIndexes;
+
+ private int[] targetIndexes;
+ }
+}
diff --git a/hugegraph-spark-connector/src/main/java/org/apache/hugegraph/spark/connector/builder/ElementBuilder.java b/hugegraph-spark-connector/src/main/java/org/apache/hugegraph/spark/connector/builder/ElementBuilder.java
new file mode 100644
index 00000000..43089fbd
--- /dev/null
+++ b/hugegraph-spark-connector/src/main/java/org/apache/hugegraph/spark/connector/builder/ElementBuilder.java
@@ -0,0 +1,403 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.spark.connector.builder;
+
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.CharsetEncoder;
+import java.nio.charset.CoderResult;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.commons.collections.ListUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hugegraph.spark.connector.client.HGLoadContext;
+import org.apache.hugegraph.spark.connector.client.SchemaCache;
+import org.apache.hugegraph.spark.connector.constant.Constants;
+import org.apache.hugegraph.spark.connector.mapping.ElementMapping;
+import org.apache.hugegraph.spark.connector.utils.DataTypeUtils;
+import org.apache.hugegraph.structure.GraphElement;
+import org.apache.hugegraph.structure.constant.IdStrategy;
+import org.apache.hugegraph.structure.graph.Vertex;
+import org.apache.hugegraph.structure.schema.EdgeLabel;
+import org.apache.hugegraph.structure.schema.PropertyKey;
+import org.apache.hugegraph.structure.schema.SchemaLabel;
+import org.apache.hugegraph.structure.schema.VertexLabel;
+import org.apache.hugegraph.util.E;
+import org.apache.hugegraph.util.LongEncoding;
+
+import com.google.common.collect.ImmutableList;
+
+public abstract class ElementBuilder<GE extends GraphElement> {
+
+ private final SchemaCache schema;
+
+ // NOTE: CharsetEncoder is not thread safe
+ private final CharsetEncoder encoder;
+
+ private final ByteBuffer buffer;
+
+ public ElementBuilder(HGLoadContext context) {
+ this.schema = context.schemaCache();
+ this.encoder = Constants.CHARSET.newEncoder();
+ this.buffer = ByteBuffer.allocate(Constants.VERTEX_ID_LIMIT);
+ }
+
+ public abstract ElementMapping mapping();
+
+ public abstract SchemaLabel schemaLabel();
+
+ protected abstract boolean isIdField(String fieldName);
+
+ public abstract List<GE> build(String[] names, Object[] values);
+
+ protected VertexKVPairs newKVPairs(VertexLabel vertexLabel) {
+ IdStrategy idStrategy = vertexLabel.idStrategy();
+ if (idStrategy.isCustomize()) {
+ return new VertexIdKVPairs(vertexLabel);
+ } else {
+ assert idStrategy.isPrimaryKey();
+ return new VertexPkKVPairs(vertexLabel);
+ }
+ }
+
+ /**
+ * Retain only the key-value pairs needed by the current vertex or edge
+ */
+ protected boolean retainField(String fieldName, Object fieldValue) {
+ ElementMapping mapping = this.mapping();
+ Set<String> selectedFields = mapping.selectedFields();
+ Set<String> ignoredFields = mapping.ignoredFields();
+ // Retain selected fields or remove ignored fields
+ if (!selectedFields.isEmpty() && !selectedFields.contains(fieldName)) {
+ return false;
+ }
+ return ignoredFields.isEmpty() || !ignoredFields.contains(fieldName);
+ }
+
+ protected void addProperty(GraphElement element, String key, Object value) {
+ value = this.convertPropertyValue(key, value);
+ element.property(key, value);
+ }
+
+ protected void addProperties(GraphElement element, Map<String, Object> properties) {
+ for (Map.Entry<String, Object> entry : properties.entrySet()) {
+ String key = entry.getKey();
+ Object value = entry.getValue();
+ value = this.convertPropertyValue(key, value);
+ element.property(key, value);
+ }
+ }
+
+ protected PropertyKey getPropertyKey(String name) {
+ return this.schema.getPropertyKey(name);
+ }
+
+ protected VertexLabel getVertexLabel(String name) {
+ return this.schema.getVertexLabel(name);
+ }
+
+ protected EdgeLabel getEdgeLabel(String name) {
+ return this.schema.getEdgeLabel(name);
+ }
+
+ protected Object mappingValue(String fieldName, Object fieldValue) {
+ if (this.mapping().mappingValues().isEmpty()) {
+ return fieldValue;
+ }
+ String fieldStrValue = String.valueOf(fieldValue);
+ return this.mapping().mappingValue(fieldName, fieldStrValue);
+ }
+
+ private void customizeId(VertexLabel vertexLabel, Vertex vertex,
+ String idField, Object idValue) {
+ E.checkArgumentNotNull(idField, "The vertex id field can't be null");
+ E.checkArgumentNotNull(idValue, "The vertex id value can't be null");
+ IdStrategy idStrategy = vertexLabel.idStrategy();
+ if (idStrategy.isCustomizeString()) {
+ String id = (String) idValue;
+ this.checkVertexIdLength(id);
+ vertex.id(id);
+ } else if (idStrategy.isCustomizeNumber()) {
+ Long id = DataTypeUtils.parseNumber(idField, idValue);
+ vertex.id(id);
+ } else {
+ assert idStrategy.isCustomizeUuid();
+ UUID id = DataTypeUtils.parseUUID(idField, idValue);
+ vertex.id(id);
+ }
+ }
+
+ private Object convertPropertyValue(String key, Object rawValue) {
+ PropertyKey propertyKey = this.getPropertyKey(key);
+ return DataTypeUtils.convert(rawValue, propertyKey);
+ }
+
+ private boolean vertexIdEmpty(VertexLabel vertexLabel, Vertex vertex) {
+ IdStrategy idStrategy = vertexLabel.idStrategy();
+ if (idStrategy.isCustomizeString()) {
+ Object vertexId = vertex.id();
+ return vertexId == null || StringUtils.isEmpty((String) vertexId);
+ }
+ return false;
+ }
+
+ private void checkPrimaryValuesValid(VertexLabel vertexLabel, Object[] primaryValues) {
+ List<String> primaryKeys = vertexLabel.primaryKeys();
+ E.checkArgument(primaryKeys.size() == primaryValues.length,
+ "Missing some primary key values, expect %s, " +
+ "but only got %s for vertex label '%s'",
+ primaryKeys, Arrays.toString(primaryValues), vertexLabel);
+
+ for (int i = 0; i < primaryKeys.size(); i++) {
+ E.checkArgument(primaryValues[i] != null,
+ "Make sure the value of the primary key '%s' is " +
+ "not empty, or check whether the headers or " +
+ "field_mapping are configured correctly",
+ primaryKeys.get(i));
+ }
+ }
+
+ private String spliceVertexId(VertexLabel vertexLabel, Object... primaryValues) {
+ StringBuilder vertexId = new StringBuilder();
+ StringBuilder vertexKeysId = new StringBuilder();
+ for (Object value : primaryValues) {
+ String pkValue;
+ if (value instanceof Number || value instanceof Date) {
+ pkValue = LongEncoding.encodeNumber(value);
+ } else {
+ pkValue = String.valueOf(value);
+ }
+ if (StringUtils.containsAny(pkValue, Constants.SEARCH_LIST)) {
+ pkValue = StringUtils.replaceEach(pkValue,
+ Constants.SEARCH_LIST,
+ Constants.TARGET_LIST);
+ }
+ vertexKeysId.append(pkValue);
+ vertexKeysId.append("!");
+ }
+ vertexId.append(vertexLabel.id()).append(":").append(vertexKeysId);
+ vertexId.deleteCharAt(vertexId.length() - 1);
+ return vertexId.toString();
+ }
+
+ private void checkVertexIdLength(String id) {
+ this.encoder.reset();
+ this.buffer.clear();
+ CoderResult r = this.encoder.encode(CharBuffer.wrap(id.toCharArray()),
+ this.buffer, true);
+ E.checkArgument(r.isUnderflow(),
+ "The vertex id length exceeds limit %s : '%s'",
+ Constants.VERTEX_ID_LIMIT, id);
+ }
+
+ private boolean isEmptyPkValue(Object pkValue) {
+ if (pkValue == null) {
+ return true;
+ }
+ if (pkValue instanceof String) {
+ String pkValueStr = (String) pkValue;
+ return pkValueStr.isEmpty();
+ }
+ return false;
+ }
+
+ public abstract static class VertexKVPairs {
+
+ public final VertexLabel vertexLabel;
+
+ // General properties
+ public Map<String, Object> properties;
+
+ public VertexKVPairs(VertexLabel vertexLabel) {
+ this.vertexLabel = vertexLabel;
+ this.properties = null;
+ }
+
+ public abstract void extractFromVertex(String[] names, Object[] values);
+
+ public abstract void extractFromEdge(String[] names, Object[] values, int[] fieldIndexes);
+
+ public abstract List<Vertex> buildVertices(boolean withProperty, boolean withId);
+ }
+
+ public class VertexIdKVPairs extends VertexKVPairs {
+
+ // The idField(raw field), like: id
+ private String idField;
+
+ // The single idValue(mapped), like: A -> 1
+ private Object idValue;
+
+ public VertexIdKVPairs(VertexLabel vertexLabel) {
+ super(vertexLabel);
+ }
+
+ @Override
+ public void extractFromVertex(String[] names, Object[] values) {
+ // General properties
+ this.properties = new HashMap<>();
+ for (int i = 0; i < names.length; i++) {
+ String fieldName = names[i];
+ Object fieldValue = values[i];
+ if (!retainField(fieldName, fieldValue)) {
+ continue;
+ }
+ if (isIdField(fieldName)) {
+ this.idField = fieldName;
+ this.idValue = mappingValue(fieldName, fieldValue);
+ } else {
+ String key = mapping().mappingField(fieldName);
+ Object value = mappingValue(fieldName, fieldValue);
+ this.properties.put(key, value);
+ }
+ }
+ }
+
+ @Override
+ public void extractFromEdge(String[] names, Object[] values, int[] fieldIndexes) {
+ assert fieldIndexes.length == 1;
+ String fieldName = names[fieldIndexes[0]];
+ Object fieldValue = values[fieldIndexes[0]];
+ this.idField = fieldName;
+ this.idValue = mappingValue(fieldName, fieldValue);
+ }
+
+ @Override
+ public List<Vertex> buildVertices(boolean withProperty, boolean withId) {
+ Vertex vertex = new Vertex(vertexLabel.name());
+ customizeId(vertexLabel, vertex, this.idField, this.idValue);
+ // empty string id ("")
+ if (vertexIdEmpty(vertexLabel, vertex)) {
+ return ImmutableList.of();
+ }
+ if (withProperty) {
+ String key = mapping().mappingField(this.idField);
+ // The id field is also used as a general property
+ if (vertexLabel.properties().contains(key)) {
+ addProperty(vertex, key, this.idValue);
+ }
+ addProperties(vertex, this.properties);
+ }
+ return ImmutableList.of(vertex);
+ }
+ }
+
+ public class VertexPkKVPairs extends VertexKVPairs {
+
+ /*
+ * The primary key names(mapped), allowed multiple
+ * like: [p_name,p_age] -> [name,age]
+ */
+ private List<String> pkNames;
+
+ /*
+ * The primary values(mapped), length is the same as pkNames
+ * like: [m,2] -> [marko,18]
+ */
+ private Object[] pkValues;
+
+ public VertexPkKVPairs(VertexLabel vertexLabel) {
+ super(vertexLabel);
+ }
+
+ @Override
+ public void extractFromVertex(String[] names, Object[] values) {
+ List<String> primaryKeys = this.vertexLabel.primaryKeys();
+ this.pkNames = primaryKeys;
+ this.pkValues = new Object[primaryKeys.size()];
+ // General properties
+ this.properties = new HashMap<>();
+ for (int i = 0; i < names.length; i++) {
+ String fieldName = names[i];
+ Object fieldValue = values[i];
+ if (!retainField(fieldName, fieldValue)) {
+ continue;
+ }
+ String key = mapping().mappingField(fieldName);
+ if (primaryKeys.contains(key)) {
+ // Don't put primary key/values into general properties
+ int index = primaryKeys.indexOf(key);
+ Object pkValue = mappingValue(fieldName, fieldValue);
+ this.pkValues[index] = pkValue;
+ } else {
+ Object value = mappingValue(fieldName, fieldValue);
+ this.properties.put(key, value);
+ }
+ }
+ }
+
+ @Override
+ public void extractFromEdge(String[] names, Object[] values, int[] fieldIndexes) {
+ this.pkNames = new ArrayList<>(fieldIndexes.length);
+ for (int fieldIndex : fieldIndexes) {
+ String fieldName = names[fieldIndex];
+ String mappingField = mapping().mappingField(fieldName);
+ this.pkNames.add(mappingField);
+ }
+ List<String> primaryKeys = this.vertexLabel.primaryKeys();
+ E.checkArgument(ListUtils.isEqualList(this.pkNames, primaryKeys),
+ "Make sure the the primary key fields %s are " +
+ "not empty, or check whether the headers or " +
+ "field_mapping are configured correctly",
+ primaryKeys);
+ this.pkValues = new Object[this.pkNames.size()];
+ for (int i = 0; i < fieldIndexes.length; i++) {
+ String fieldName = names[fieldIndexes[i]];
+ Object fieldValue = values[fieldIndexes[i]];
+ Object pkValue = mappingValue(fieldName, fieldValue);
+ this.pkValues[i] = pkValue;
+ }
+ }
+
+ @Override
+ public List<Vertex> buildVertices(boolean withProperty, boolean withId) {
+ checkPrimaryValuesValid(vertexLabel, this.pkValues);
+ for (int i = 0; i < this.pkNames.size(); i++) {
+ if (isEmptyPkValue(this.pkValues[i])) {
+ return ImmutableList.of();
+ }
+ Object pkValue = convertPropertyValue(this.pkNames.get(i), this.pkValues[i]);
+ this.pkValues[i] = pkValue;
+ }
+ String id = spliceVertexId(vertexLabel, this.pkValues);
+ checkVertexIdLength(id);
+
+ Vertex vertex = new Vertex(vertexLabel.name());
+ // NOTE: withProperty is true means that parsing vertex
+ if (withProperty) {
+ for (int i = 0; i < this.pkNames.size(); i++) {
+ addProperty(vertex, this.pkNames.get(i), this.pkValues[i]);
+ }
+ addProperties(vertex, this.properties);
+ }
+ if (withId) {
+ vertex.id(id);
+ } else {
+ vertex.id(null);
+ }
+ return ImmutableList.of(vertex);
+ }
+ }
+}
diff --git a/hugegraph-spark-connector/src/main/java/org/apache/hugegraph/spark/connector/builder/VertexBuilder.java b/hugegraph-spark-connector/src/main/java/org/apache/hugegraph/spark/connector/builder/VertexBuilder.java
new file mode 100644
index 00000000..bb78e571
--- /dev/null
+++ b/hugegraph-spark-connector/src/main/java/org/apache/hugegraph/spark/connector/builder/VertexBuilder.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.spark.connector.builder;
+
+import java.util.List;
+
+import org.apache.hugegraph.spark.connector.client.HGLoadContext;
+import org.apache.hugegraph.spark.connector.mapping.VertexMapping;
+import org.apache.hugegraph.structure.graph.Vertex;
+import org.apache.hugegraph.structure.schema.SchemaLabel;
+import org.apache.hugegraph.structure.schema.VertexLabel;
+import org.apache.hugegraph.util.E;
+
+public class VertexBuilder extends ElementBuilder<Vertex> {
+
+ private final VertexMapping mapping;
+
+ private final VertexLabel vertexLabel;
+
+ public VertexBuilder(HGLoadContext context, VertexMapping mapping) {
+ super(context);
+ this.mapping = mapping;
+ this.vertexLabel = this.getVertexLabel(this.mapping.label());
+ // Ensure the id field is matched with id strategy
+ this.checkIdField();
+ }
+
+ @Override
+ public VertexMapping mapping() {
+ return this.mapping;
+ }
+
+ @Override
+ public SchemaLabel schemaLabel() {
+ return this.vertexLabel;
+ }
+
+ @Override
+ protected boolean isIdField(String fieldName) {
+ return fieldName.equals(this.mapping.idField());
+ }
+
+ @Override
+ public List<Vertex> build(String[] names, Object[] values) {
+ VertexKVPairs kvPairs = this.newKVPairs(this.vertexLabel);
+ kvPairs.extractFromVertex(names, values);
+ return kvPairs.buildVertices(true, false);
+ }
+
+ private void checkIdField() {
+ String name = this.vertexLabel.name();
+ if (this.vertexLabel.idStrategy().isCustomize()) {
+ E.checkState(this.mapping.idField() != null,
+ "The id field can't be empty or null when " +
+ "id strategy is '%s' for vertex label '%s'",
+ this.vertexLabel.idStrategy(), name);
+ } else if (this.vertexLabel.idStrategy().isPrimaryKey()) {
+ E.checkState(this.mapping.idField() == null,
+ "The id field must be empty or null when " +
+ "id strategy is '%s' for vertex label '%s'",
+ this.vertexLabel.idStrategy(), name);
+ } else {
+ // The id strategy is automatic
+ throw new IllegalArgumentException("Unsupported AUTOMATIC id strategy for " +
+ "hugegraph Spark Connector.");
+ }
+ }
+}
diff --git a/hugegraph-spark-connector/src/main/java/org/apache/hugegraph/spark/connector/client/HGClientHolder.java b/hugegraph-spark-connector/src/main/java/org/apache/hugegraph/spark/connector/client/HGClientHolder.java
new file mode 100644
index 00000000..65f08183
--- /dev/null
+++ b/hugegraph-spark-connector/src/main/java/org/apache/hugegraph/spark/connector/client/HGClientHolder.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.spark.connector.client;
+
+import java.io.Serializable;
+import java.nio.file.Paths;
+import java.util.Objects;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hugegraph.driver.HugeClient;
+import org.apache.hugegraph.driver.HugeClientBuilder;
+import org.apache.hugegraph.exception.ServerException;
+import org.apache.hugegraph.rest.ClientException;
+import org.apache.hugegraph.spark.connector.constant.Constants;
+import org.apache.hugegraph.spark.connector.exception.LoadException;
+import org.apache.hugegraph.spark.connector.options.HGOptions;
+import org.apache.hugegraph.util.E;
+
+public final class HGClientHolder implements Serializable {
+
+ public static HugeClient create(HGOptions options) {
+ String host = options.host();
+ int port = options.port();
+ String graph = options.graph();
+ String username = options.username();
+ String token = options.token();
+ String trustStoreFile = options.trustStoreFile();
+ String trustStoreToken = options.trustStoreToken();
+
+ boolean useHttps = options.protocol().equals(Constants.HTTPS_SCHEMA);
+ String address = host + Constants.COLON_STR + port;
+ if (!host.startsWith(Constants.HTTP_PREFIX) && !host.startsWith(Constants.HTTPS_PREFIX)) {
+ if (useHttps) {
+ address = Constants.HTTPS_PREFIX + address;
+ } else {
+ address = Constants.HTTP_PREFIX + address;
+ }
+ }
+ username = Objects.nonNull(username) ? username : graph;
+ HugeClientBuilder builder;
+ try {
+ builder = HugeClient.builder(address, graph)
+ .configUser(username, token)
+ .configTimeout(options.timeout())
+ .configPool(options.maxConnection(),
+ options.maxConnectionPerRoute());
+ if (useHttps) {
+ String trustFile = trustStoreFile;
+ if (Objects.isNull(trustStoreFile)) {
+ String homePath = System.getProperty(Constants.CONNECTOR_HOME_PATH);
+ E.checkArgument(StringUtils.isNotEmpty(homePath),
+ "The system property 'connector.home.path' " +
+ "can't be null or empty when enable https protocol");
+ trustFile = Paths.get(homePath, Constants.TRUST_STORE_PATH).toString();
+ }
+ trustStoreToken = Objects.isNull(trustStoreToken) ?
+ Constants.DEFAULT_TRUST_STORE_TOKEN : trustStoreToken;
+ builder.configSSL(trustFile, trustStoreToken);
+ }
+ return builder.build();
+ } catch (IllegalStateException e) {
+ String message = e.getMessage();
+ if (Objects.nonNull(message) && message.startsWith("The version")) {
+ throw new LoadException("The version of hugegraph-client and " +
+ "hugegraph-server don't match", e);
+ }
+ throw e;
+ } catch (ServerException e) {
+ String message = e.getMessage();
+ if (Constants.STATUS_UNAUTHORIZED == e.status() ||
+ (Objects.nonNull(message) && message.startsWith("Authentication"))) {
+ throw new LoadException("Incorrect username or password", e);
+ }
+ throw e;
+ } catch (ClientException e) {
+ Throwable cause = e.getCause();
+ if (cause == null || cause.getMessage() == null) {
+ throw e;
+ }
+ String message = cause.getMessage();
+ if (message.contains("Connection refused")) {
+ throw new LoadException(
+ String.format("The service %s:%s is unavailable", host, port), e);
+ } else if (message.contains("java.net.UnknownHostException") ||
+ message.contains("Host name may not be null")) {
+ throw new LoadException(String.format("The host %s is unknown", host), e);
+ } else if (message.contains("connect timed out")) {
+ throw new LoadException(String.format("Connect service %s:%s timeout, " +
+ "please check service is available " +
+ "and network is unobstructed", host, port),
+ e);
+ }
+ throw e;
+ }
+ }
+}
diff --git a/hugegraph-spark-connector/src/main/java/org/apache/hugegraph/spark/connector/client/HGLoadContext.java b/hugegraph-spark-connector/src/main/java/org/apache/hugegraph/spark/connector/client/HGLoadContext.java
new file mode 100644
index 00000000..6201a022
--- /dev/null
+++ b/hugegraph-spark-connector/src/main/java/org/apache/hugegraph/spark/connector/client/HGLoadContext.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.spark.connector.client;
+
+import java.io.Serializable;
+
+import org.apache.hugegraph.driver.HugeClient;
+import org.apache.hugegraph.exception.ServerException;
+import org.apache.hugegraph.spark.connector.exception.LoadException;
+import org.apache.hugegraph.spark.connector.options.HGOptions;
+import org.apache.hugegraph.structure.constant.GraphMode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class HGLoadContext implements Serializable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(HGLoadContext.class);
+
+ private final HGOptions options;
+
+ private final HugeClient client;
+
+ private final SchemaCache schemaCache;
+
+ public HGLoadContext(HGOptions options) {
+ this.options = options;
+ this.client = HGClientHolder.create(options);
+ this.schemaCache = new SchemaCache(this.client);
+ }
+
+ public HGOptions options() {
+ return this.options;
+ }
+
+ public HugeClient client() {
+ return this.client;
+ }
+
+ public SchemaCache schemaCache() {
+ return this.schemaCache;
+ }
+
+ public void updateSchemaCache() {
+ assert this.client != null;
+ this.schemaCache.updateAll();
+ }
+
+ public void setLoadingMode() {
+ String graph = this.client.graph().graph();
+ try {
+ this.client.graphs().mode(graph, GraphMode.LOADING);
+ } catch (ServerException e) {
+ if (e.getMessage().contains("Can not deserialize value of type")) {
+ LOG.warn("HugeGraphServer doesn't support loading mode");
+ } else {
+ throw e;
+ }
+ }
+ }
+
+ public void unsetLoadingMode() {
+ try {
+ String graph = this.client.graph().graph();
+ GraphMode mode = this.client.graphs().mode(graph);
+ if (mode.loading()) {
+ this.client.graphs().mode(graph, GraphMode.NONE);
+ }
+ } catch (Exception e) {
+ throw new LoadException(String.format("Failed to unset mode %s for server",
+ GraphMode.LOADING), e);
+ }
+ }
+
+ public void close() {
+ LOG.info("Write load progress successfully");
+ this.client.close();
+ LOG.info("Close HugeClient successfully");
+ }
+}
diff --git a/hugegraph-spark-connector/src/main/java/org/apache/hugegraph/spark/connector/client/SchemaCache.java b/hugegraph-spark-connector/src/main/java/org/apache/hugegraph/spark/connector/client/SchemaCache.java
new file mode 100644
index 00000000..3ac9a953
--- /dev/null
+++ b/hugegraph-spark-connector/src/main/java/org/apache/hugegraph/spark/connector/client/SchemaCache.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.spark.connector.client;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hugegraph.driver.HugeClient;
+import org.apache.hugegraph.exception.ServerException;
+import org.apache.hugegraph.spark.connector.exception.LoadException;
+import org.apache.hugegraph.structure.schema.EdgeLabel;
+import org.apache.hugegraph.structure.schema.PropertyKey;
+import org.apache.hugegraph.structure.schema.VertexLabel;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+public final class SchemaCache {
+
+ private final HugeClient client;
+
+ private final Map<String, PropertyKey> propertyKeys;
+
+ private final Map<String, VertexLabel> vertexLabels;
+
+ private final Map<String, EdgeLabel> edgeLabels;
+
+ public SchemaCache(HugeClient client) {
+ this.client = client;
+ this.propertyKeys = new HashMap<>();
+ this.vertexLabels = new HashMap<>();
+ this.edgeLabels = new HashMap<>();
+ }
+
+ public SchemaCache(@JsonProperty(value = "propertykeys") List<PropertyKey> propertyKeyList,
+ @JsonProperty(value = "vertexlabels") List<VertexLabel> vertexLabelList,
+ @JsonProperty(value = "edgelabels") List<EdgeLabel> edgeLabelList) {
+ this.client = null;
+ this.propertyKeys = new HashMap<>();
+ this.vertexLabels = new HashMap<>();
+ this.edgeLabels = new HashMap<>();
+ propertyKeyList.forEach(pk -> this.propertyKeys.put(pk.name(), pk));
+ vertexLabelList.forEach(vl -> this.vertexLabels.put(vl.name(), vl));
+ edgeLabelList.forEach(el -> this.edgeLabels.put(el.name(), el));
+ }
+
+ public void updateAll() {
+ this.propertyKeys.clear();
+ client.schema().getPropertyKeys().forEach(pk -> this.propertyKeys.put(pk.name(), pk));
+ this.vertexLabels.clear();
+ client.schema().getVertexLabels().forEach(vl -> this.vertexLabels.put(vl.name(), vl));
+ this.edgeLabels.clear();
+ client.schema().getEdgeLabels().forEach(el -> this.edgeLabels.put(el.name(), el));
+ }
+
+ public PropertyKey getPropertyKey(String name) {
+ PropertyKey propertyKey = this.propertyKeys.get(name);
+ if (propertyKey == null) {
+ try {
+ propertyKey = this.client.schema().getPropertyKey(name);
+ } catch (ServerException e) {
+ throw new LoadException(String.format("The property key '%s' doesn't exist", name));
+ }
+ }
+ return propertyKey;
+ }
+
+ public VertexLabel getVertexLabel(String name) {
+ VertexLabel vertexLabel = this.vertexLabels.get(name);
+ if (vertexLabel == null) {
+ try {
+ vertexLabel = this.client.schema().getVertexLabel(name);
+ } catch (ServerException e) {
+ throw new LoadException(String.format("The vertex label '%s' doesn't exist", name));
+ }
+ }
+ return vertexLabel;
+ }
+
+ public EdgeLabel getEdgeLabel(String name) {
+ EdgeLabel edgeLabel = this.edgeLabels.get(name);
+ if (edgeLabel == null) {
+ try {
+ edgeLabel = this.client.schema().getEdgeLabel(name);
+ } catch (ServerException e) {
+ throw new LoadException(String.format("The edge label '%s' doesn't exist", name));
+ }
+ }
+ return edgeLabel;
+ }
+
+ public boolean isEmpty() {
+ return this.propertyKeys.isEmpty() && this.vertexLabels.isEmpty() &&
+ this.edgeLabels.isEmpty();
+ }
+}
diff --git a/hugegraph-spark-connector/src/main/java/org/apache/hugegraph/spark/connector/constant/Constants.java b/hugegraph-spark-connector/src/main/java/org/apache/hugegraph/spark/connector/constant/Constants.java
new file mode 100644
index 00000000..bcf399b2
--- /dev/null
+++ b/hugegraph-spark-connector/src/main/java/org/apache/hugegraph/spark/connector/constant/Constants.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.spark.connector.constant;
+
+import java.nio.charset.Charset;
+
+import com.google.common.base.Charsets;
+
+public final class Constants {
+
+ public static final Charset CHARSET = Charsets.UTF_8;
+
+ public static final String HTTP_PREFIX = "http://";
+
+ public static final String HTTPS_PREFIX = "https://";
+
+ public static final String TRUST_STORE_PATH = "conf/hugegraph.truststore";
+
+ public static final String COLON_STR = ":";
+
+ public static final String COMMA_STR = ",";
+
+ public static final int STATUS_UNAUTHORIZED = 401;
+
+ public static final int VERTEX_ID_LIMIT = 128;
+
+ public static final String[] SEARCH_LIST = new String[]{":", "!"};
+
+ public static final String[] TARGET_LIST = new String[]{"`:", "`!"};
+
+ public static final String TIMESTAMP = "timestamp";
+
+ public static final String TIME_ZONE = "GMT+8";
+
+ public static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
+
+ public static final String DEFAULT_TRUST_STORE_TOKEN = "hugegraph";
+
+ public static final String CONNECTOR_HOME_PATH = "connector.home.path";
+
+ public static final String HTTPS_SCHEMA = "https";
+
+ public static final String HTTP_SCHEMA = "http";
+
+ public static final String DEFAULT_HOST = "localhost";
+
+ public static final int DEFAULT_PORT = 8080;
+
+ public static final String DEFAULT_GRAPH = "hugegraph";
+
+ public static final String DEFAULT_PROTOCOL = HTTP_SCHEMA;
+
+ public static final int DEFAULT_TIMEOUT = 60;
+
+ public static final int DEFAULT_BATCH_SIZE = 500;
+}
diff --git a/hugegraph-spark-connector/src/main/java/org/apache/hugegraph/spark/connector/constant/DataTypeEnum.java b/hugegraph-spark-connector/src/main/java/org/apache/hugegraph/spark/connector/constant/DataTypeEnum.java
new file mode 100644
index 00000000..ad0769f3
--- /dev/null
+++ b/hugegraph-spark-connector/src/main/java/org/apache/hugegraph/spark/connector/constant/DataTypeEnum.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.spark.connector.constant;
+
+public enum DataTypeEnum {
+
+ VERTEX("vertex"),
+
+ EDGE("edge");
+
+ private final String type;
+
+ DataTypeEnum(String type) {
+ this.type = type;
+ }
+
+ public String dataType() {
+ return this.type;
+ }
+
+ public static boolean isVertex(String type) {
+ return VERTEX.type.equalsIgnoreCase(type);
+ }
+
+ public static boolean isEdge(String type) {
+ return EDGE.type.equalsIgnoreCase(type);
+ }
+
+ public static boolean validDataType(String type) {
+ return VERTEX.name().equalsIgnoreCase(type) || EDGE.name().equalsIgnoreCase(type);
+ }
+}
diff --git a/hugegraph-spark-connector/src/main/java/org/apache/hugegraph/spark/connector/exception/LoadException.java b/hugegraph-spark-connector/src/main/java/org/apache/hugegraph/spark/connector/exception/LoadException.java
new file mode 100644
index 00000000..a5272563
--- /dev/null
+++ b/hugegraph-spark-connector/src/main/java/org/apache/hugegraph/spark/connector/exception/LoadException.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.spark.connector.exception;
+
+public class LoadException extends RuntimeException {
+
+ private static final long serialVersionUID = 5504623124963497613L;
+
+ public LoadException(String message) {
+ super(message);
+ }
+
+ public LoadException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public LoadException(String message, Object... args) {
+ super(String.format(message, args));
+ }
+
+ public LoadException(String message, Throwable cause, Object... args) {
+ super(String.format(message, args), cause);
+ }
+}
diff --git a/hugegraph-spark-connector/src/main/java/org/apache/hugegraph/spark/connector/mapping/EdgeMapping.java b/hugegraph-spark-connector/src/main/java/org/apache/hugegraph/spark/connector/mapping/EdgeMapping.java
new file mode 100644
index 00000000..f12c73e2
--- /dev/null
+++ b/hugegraph-spark-connector/src/main/java/org/apache/hugegraph/spark/connector/mapping/EdgeMapping.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.spark.connector.mapping;
+
+import java.util.List;
+
+import org.apache.hugegraph.spark.connector.constant.DataTypeEnum;
+import org.apache.hugegraph.util.E;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class EdgeMapping extends ElementMapping {
+
+ @JsonProperty("source")
+ private final List<String> sourceFields;
+
+ @JsonProperty("target")
+ private final List<String> targetFields;
+
+ public EdgeMapping(List<String> sourceFields, List<String> targetFields) {
+ this.sourceFields = sourceFields;
+ this.targetFields = targetFields;
+ }
+
+ @Override
+ public DataTypeEnum type() {
+ return DataTypeEnum.EDGE;
+ }
+
+ @Override
+ public void check() throws IllegalArgumentException {
+ super.check();
+ E.checkArgument(this.sourceFields != null && !this.sourceFields.isEmpty(),
+ "The source field of edge label '%s' can't be null or empty", this.label());
+ E.checkArgument(this.targetFields != null && !this.targetFields.isEmpty(),
+ "The target field of edge label '%s' can't be null or empty", this.label());
+ }
+
+ public List<String> sourceFields() {
+ return this.sourceFields;
+ }
+
+ public List<String> targetFields() {
+ return this.targetFields;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("edge-mapping(label=%s)", this.label());
+ }
+}
diff --git a/hugegraph-spark-connector/src/main/java/org/apache/hugegraph/spark/connector/mapping/ElementMapping.java b/hugegraph-spark-connector/src/main/java/org/apache/hugegraph/spark/connector/mapping/ElementMapping.java
new file mode 100644
index 00000000..4bcdb40e
--- /dev/null
+++ b/hugegraph-spark-connector/src/main/java/org/apache/hugegraph/spark/connector/mapping/ElementMapping.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.spark.connector.mapping;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hugegraph.spark.connector.constant.DataTypeEnum;
+import org.apache.hugegraph.util.E;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonPropertyOrder;
+
+@JsonPropertyOrder({"label"})
+public abstract class ElementMapping implements Serializable {
+
+ @JsonProperty("label")
+ private String label;
+
+ @JsonProperty("field_mapping")
+ private Map<String, String> mappingFields;
+
+ @JsonProperty("value_mapping")
+ private Map<String, Map<String, Object>> mappingValues;
+
+ @JsonProperty("selected")
+ private Set<String> selectedFields;
+
+ @JsonProperty("ignored")
+ private Set<String> ignoredFields;
+
+ @JsonProperty("batch_size")
+ private int batchSize;
+
+ public ElementMapping() {
+ this.mappingFields = new HashMap<>();
+ this.mappingValues = new HashMap<>();
+ this.selectedFields = new HashSet<>();
+ this.ignoredFields = new HashSet<>();
+ this.batchSize = 0;
+ }
+
+ public abstract DataTypeEnum type();
+
+ public void check() throws IllegalArgumentException {
+ this.mappingFields.values().forEach(value -> E.checkArgument(value != null,
+ "The value in field_mapping " +
+ "can't be null"));
+ this.mappingValues.values()
+ .forEach(m -> m.values().forEach(value -> E.checkArgument(value != null,
+ "The value in " +
+ "value_mapping can't be null")));
+ }
+
+ public String label() {
+ return this.label;
+ }
+
+ public void label(String label) {
+ this.label = label;
+ }
+
+ public Map<String, String> mappingFields() {
+ return this.mappingFields;
+ }
+
+ public void mappingFields(Map<String, String> mappingFields) {
+ this.mappingFields = mappingFields;
+ }
+
+ public String mappingField(String fieldName) {
+ if (this.mappingFields.isEmpty()) {
+ return fieldName;
+ }
+ String mappingName = this.mappingFields.get(fieldName);
+ return mappingName != null ? mappingName : fieldName;
+ }
+
+ public Map<String, Map<String, Object>> mappingValues() {
+ return this.mappingValues;
+ }
+
+ public void mappingValues(Map<String, Map<String, Object>> mappingValues) {
+ this.mappingValues = mappingValues;
+ }
+
+ public Object mappingValue(String fieldName, String rawValue) {
+ if (this.mappingValues.isEmpty()) {
+ return rawValue;
+ }
+ Object mappingValue = rawValue;
+ Map<String, Object> values = this.mappingValues.get(fieldName);
+ if (values != null) {
+ Object value = values.get(rawValue);
+ if (value != null) {
+ mappingValue = value;
+ }
+ }
+ return mappingValue;
+ }
+
+ public int batchSize() {
+ return this.batchSize;
+ }
+
+ public void batchSize(int batchSize) {
+ this.batchSize = batchSize;
+ }
+
+ public Set<String> selectedFields() {
+ return this.selectedFields;
+ }
+
+ public void selectedFields(Set<String> selectedFields) {
+ this.selectedFields = selectedFields;
+ }
+
+ public Set<String> ignoredFields() {
+ return this.ignoredFields;
+ }
+
+ public void ignoredFields(Set<String> ignoredFields) {
+ this.ignoredFields = ignoredFields;
+ }
+}
diff --git a/hugegraph-spark-connector/src/main/java/org/apache/hugegraph/spark/connector/mapping/VertexMapping.java b/hugegraph-spark-connector/src/main/java/org/apache/hugegraph/spark/connector/mapping/VertexMapping.java
new file mode 100644
index 00000000..9a9928d5
--- /dev/null
+++ b/hugegraph-spark-connector/src/main/java/org/apache/hugegraph/spark/connector/mapping/VertexMapping.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.spark.connector.mapping;
+
+import org.apache.hugegraph.spark.connector.constant.DataTypeEnum;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class VertexMapping extends ElementMapping {
+
+ @JsonProperty("id")
+ private final String idField;
+
+ public VertexMapping(String idField) {
+ this.idField = idField;
+ }
+
+ @Override
+ public DataTypeEnum type() {
+ return DataTypeEnum.VERTEX;
+ }
+
+ @Override
+ public void check() throws IllegalArgumentException {
+ super.check();
+ }
+
+ public String idField() {
+ return this.idField;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("vertex-mapping(label=%s)", this.label());
+ }
+}
diff --git a/hugegraph-spark-connector/src/main/java/org/apache/hugegraph/spark/connector/options/HGOptions.java b/hugegraph-spark-connector/src/main/java/org/apache/hugegraph/spark/connector/options/HGOptions.java
new file mode 100644
index 00000000..20a5b2cd
--- /dev/null
+++ b/hugegraph-spark-connector/src/main/java/org/apache/hugegraph/spark/connector/options/HGOptions.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.spark.connector.options;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hugegraph.spark.connector.constant.Constants;
+import org.apache.hugegraph.spark.connector.constant.DataTypeEnum;
+import org.apache.hugegraph.util.E;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class HGOptions implements Serializable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(HGOptions.class);
+
+ private static final int CPUS = Runtime.getRuntime().availableProcessors();
+
+ /* Client Configs */
+ public static final String HOST = "host";
+ public static final String PORT = "port";
+ public static final String GRAPH = "graph";
+ public static final String PROTOCOL = "protocol";
+ public static final String USERNAME = "username";
+ public static final String TOKEN = "token";
+ public static final String TIMEOUT = "timeout";
+ public static final String MAX_CONNECTIONS = "max-conn";
+ public static final String MAX_CONNECTIONS_PER_ROUTE = "max-conn-per-route";
+ public static final String TRUST_STORE_FILE = "trust-store-file";
+ public static final String TRUST_STORE_TOKEN = "trust-store-token";
+
+ /* Graph Data Configs */
+ public static final String DATA_TYPE = "data-type";
+ public static final String LABEL = "label";
+ public static final String ID_FIELD = "id";
+ public static final String SOURCE_NAME = "source-name";
+ public static final String TARGET_NAME = "target-name";
+ public static final String SELECTED_FIELDS = "selected-fields";
+ public static final String IGNORED_FIELDS = "ignored-fields";
+ public static final String BATCH_SIZE = "batch-size";
+
+ /* Common Configs */
+ public static final String DELIMITER = "delimiter";
+
+ public Map<String, String> parameters;
+
+ public HGOptions(Map<String, String> options) {
+ parameters = options.entrySet()
+ .stream()
+ .collect(Collectors.toMap(e -> e.getKey().toLowerCase().trim(),
+ Map.Entry::getValue));
+ checkRequiredConf();
+ setDefaultConf();
+ checkFieldsConflict();
+ LOG.info("HugeGraph Spark Connector Configs: {}", parameters);
+ }
+
+ private void checkRequiredConf() {
+ String dataType = parameters.getOrDefault(DATA_TYPE, null);
+ E.checkArgument(DataTypeEnum.validDataType(dataType),
+ "DataType must be set, either vertex " +
+ "or edge, but got %s.", dataType);
+
+ String label = parameters.getOrDefault(LABEL, null);
+ E.checkArgument(!StringUtils.isEmpty(label),
+ "Label must be set, but got %s.", label);
+
+ if (DataTypeEnum.isEdge(dataType)) {
+ String sourceNames = parameters.getOrDefault(SOURCE_NAME, null);
+ E.checkArgument(!StringUtils.isEmpty(sourceNames), "source-names must be set " +
+ "when datatype is edge, but got " +
+ "%s.", sourceNames);
+ String targetNames = parameters.getOrDefault(TARGET_NAME, null);
+ E.checkArgument(!StringUtils.isEmpty(targetNames), "target-names must be set " +
+ "when datatype is edge, but got " +
+ "%s.", targetNames);
+ LOG.info("Edge, Label is {}, source is {}, target is {}.",
+ label, sourceNames, targetNames);
+ } else {
+ String idField = parameters.getOrDefault(ID_FIELD, null);
+ if (Objects.nonNull(idField)) {
+ LOG.info("Vertex, Label is {}, id is {}, id strategy is {}", label, idField,
+ "customize");
+ } else {
+ LOG.info("Vertex, Label is {}, id is {}, id strategy is {}", label, null,
+ "PrimaryKey");
+ }
+ }
+ }
+
+ private void checkFieldsConflict() {
+ Set<String> selectSet = selectedFields();
+ Set<String> ignoreSet = ignoredFields();
+ E.checkArgument(selectSet.isEmpty() || ignoreSet.isEmpty(),
+ "Not allowed to specify selected(%s) and ignored(%s) " +
+ "fields at the same time, at least one of them " +
+ "must be empty", selectSet, ignoreSet);
+ }
+
+ private void setDefaultConf() {
+ setDefaultValueWithMsg(HOST, Constants.DEFAULT_HOST,
+ String.format("Host not set, use default host: %s instead.",
+ Constants.DEFAULT_HOST));
+ setDefaultValueWithMsg(PORT, String.valueOf(Constants.DEFAULT_PORT),
+ String.format("Port not set, use default port: %s instead.",
+ Constants.DEFAULT_PORT));
+ setDefaultValueWithMsg(GRAPH, Constants.DEFAULT_GRAPH,
+ String.format("Graph not set, use default graph: %s instead.",
+ Constants.DEFAULT_GRAPH));
+
+ setDefaultValue(PROTOCOL, Constants.DEFAULT_PROTOCOL);
+ setDefaultValue(USERNAME, null);
+ setDefaultValue(TOKEN, null);
+ setDefaultValue(TIMEOUT, String.valueOf(Constants.DEFAULT_TIMEOUT));
+ setDefaultValue(MAX_CONNECTIONS, String.valueOf(CPUS * 4));
+ setDefaultValue(MAX_CONNECTIONS_PER_ROUTE, String.valueOf(CPUS * 2));
+ setDefaultValue(TRUST_STORE_FILE, null);
+ setDefaultValue(TRUST_STORE_TOKEN, null);
+
+ setDefaultValue(BATCH_SIZE, String.valueOf(Constants.DEFAULT_BATCH_SIZE));
+ setDefaultValue(DELIMITER, Constants.COMMA_STR);
+ setDefaultValue(SELECTED_FIELDS, "");
+ setDefaultValue(IGNORED_FIELDS, "");
+ }
+
+ private void setDefaultValue(String key, String value) {
+ if (!parameters.containsKey(key)) {
+ parameters.put(key, value);
+ }
+ }
+
+ private void setDefaultValueWithMsg(String key, String value, String msg) {
+ if (!parameters.containsKey(key)) {
+ LOG.info(msg);
+ parameters.put(key, value);
+ }
+ }
+
+ public Map<String, String> getAllParameters() {
+ return parameters;
+ }
+
+ public String getConfValue(String confKey) {
+ String lowerConfKey = confKey.toLowerCase();
+ return parameters.getOrDefault(lowerConfKey, null);
+ }
+
+ public String host() {
+ return getConfValue(HOST);
+ }
+
+ public int port() {
+ return Integer.parseInt(getConfValue(PORT));
+ }
+
+ public String graph() {
+ return getConfValue(GRAPH);
+ }
+
+ public String protocol() {
+ return getConfValue(PROTOCOL);
+ }
+
+ public String username() {
+ return getConfValue(USERNAME);
+ }
+
+ public String token() {
+ return getConfValue(TOKEN);
+ }
+
+ public int timeout() {
+ return Integer.parseInt(getConfValue(TIMEOUT));
+ }
+
+ public int maxConnection() {
+ return Integer.parseInt(getConfValue(MAX_CONNECTIONS));
+ }
+
+ public int maxConnectionPerRoute() {
+ return Integer.parseInt(getConfValue(MAX_CONNECTIONS_PER_ROUTE));
+ }
+
+ public String trustStoreFile() {
+ return getConfValue(TRUST_STORE_FILE);
+ }
+
+ public String trustStoreToken() {
+ return getConfValue(TRUST_STORE_TOKEN);
+ }
+
+ public String dataType() {
+ return getConfValue(DATA_TYPE);
+ }
+
+ public String label() {
+ return getConfValue(LABEL);
+ }
+
+ public String idField() {
+ return getConfValue(ID_FIELD);
+ }
+
+ public List<String> sourceName() {
+ return splitStr(getConfValue(SOURCE_NAME));
+ }
+
+ public List<String> targetName() {
+ return splitStr(getConfValue(TARGET_NAME));
+ }
+
+ public Set<String> selectedFields() {
+ String selectStr = getConfValue(SELECTED_FIELDS);
+ if (StringUtils.isEmpty(selectStr)) {
+ return new HashSet<>();
+ }
+ return new HashSet<>(splitStr(selectStr, delimiter()));
+ }
+
+ public Set<String> ignoredFields() {
+ String ignoreStr = getConfValue(IGNORED_FIELDS);
+ if (StringUtils.isEmpty(ignoreStr)) {
+ return new HashSet<>();
+ }
+ return new HashSet<>(splitStr(ignoreStr, delimiter()));
+ }
+
+ public int batchSize() {
+ return Integer.parseInt(getConfValue(BATCH_SIZE));
+ }
+
+ public String delimiter() {
+ return getConfValue(DELIMITER);
+ }
+
+ private List<String> splitStr(String str) {
+ return splitStr(str, Constants.COMMA_STR);
+ }
+
+ private List<String> splitStr(String str, String delimiter) {
+ return Arrays.asList(str.split(delimiter));
+ }
+}
diff --git a/hugegraph-spark-connector/src/main/java/org/apache/hugegraph/spark/connector/utils/DataTypeUtils.java b/hugegraph-spark-connector/src/main/java/org/apache/hugegraph/spark/connector/utils/DataTypeUtils.java
new file mode 100644
index 00000000..317e7568
--- /dev/null
+++ b/hugegraph-spark-connector/src/main/java/org/apache/hugegraph/spark/connector/utils/DataTypeUtils.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.spark.connector.utils;
+
+import java.util.Date;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.hugegraph.spark.connector.constant.Constants;
+import org.apache.hugegraph.spark.connector.exception.LoadException;
+import org.apache.hugegraph.structure.constant.Cardinality;
+import org.apache.hugegraph.structure.constant.DataType;
+import org.apache.hugegraph.structure.schema.PropertyKey;
+import org.apache.hugegraph.util.E;
+
+import com.google.common.collect.ImmutableSet;
+
+public final class DataTypeUtils {
+
+ private static final Set<String> ACCEPTABLE_TRUE = ImmutableSet.of(
+ "true", "1", "yes", "y"
+ );
+
+ private static final Set<String> ACCEPTABLE_FALSE = ImmutableSet.of(
+ "false", "0", "no", "n"
+ );
+
+ public static Object convert(Object value, PropertyKey propertyKey) {
+ E.checkArgumentNotNull(value, "The value to be converted can't be null");
+
+ String key = propertyKey.name();
+ DataType dataType = propertyKey.dataType();
+ Cardinality cardinality = propertyKey.cardinality();
+ switch (cardinality) {
+ case SINGLE:
+ return parseSingleValue(key, value, dataType);
+ case SET:
+ case LIST:
+ throw new LoadException("Not support yet.");
+ default:
+ throw new AssertionError(String.format("Unsupported cardinality: '%s'",
+ cardinality));
+ }
+ }
+
+ private static Object parseSingleValue(String key, Object rawValue, DataType dataType) {
+ // Trim space if raw value is string
+ Object value = rawValue;
+ if (rawValue instanceof String) {
+ value = ((String) rawValue).trim();
+ }
+ if (dataType.isNumber()) {
+ return parseNumber(key, value, dataType);
+ } else if (dataType.isBoolean()) {
+ return parseBoolean(key, value);
+ } else if (dataType.isDate()) {
+ String dateFormat = Constants.DATE_FORMAT;
+ String timeZone = Constants.TIME_ZONE;
+ return parseDate(key, value, dateFormat, timeZone);
+ } else if (dataType.isUUID()) {
+ return parseUUID(key, value);
+ } else if (dataType.isText()) {
+ if (!(rawValue instanceof String)) {
+ value = rawValue.toString();
+ }
+ }
+ E.checkArgument(checkDataType(key, value, dataType),
+ "The value(key='%s') '%s'(%s) is not match with " +
+ "data type %s and can't convert to it",
+ key, value, value.getClass(), dataType);
+ return value;
+ }
+
+ public static long parseNumber(String key, Object rawValue) {
+ if (rawValue instanceof Number) {
+ return ((Number) rawValue).longValue();
+ } else if (rawValue instanceof String) {
+ // trim() is a little time-consuming
+ return parseLong(((String) rawValue).trim());
+ }
+ throw new IllegalArgumentException(String.format("The value(key='%s') must can be casted" +
+ " to Long, but got '%s'(%s)", key,
+ rawValue, rawValue.getClass().getName()));
+ }
+
+ public static UUID parseUUID(String key, Object rawValue) {
+ if (rawValue instanceof UUID) {
+ return (UUID) rawValue;
+ } else if (rawValue instanceof String) {
+ String value = ((String) rawValue).trim();
+ if (value.contains("-")) {
+ return UUID.fromString(value);
+ }
+ // UUID represented by hex string
+ E.checkArgument(value.length() == 32,
+ "Invalid UUID value(key='%s') '%s'", key, value);
+ String high = value.substring(0, 16);
+ String low = value.substring(16);
+ return new UUID(Long.parseUnsignedLong(high, 16),
+ Long.parseUnsignedLong(low, 16));
+ }
+ throw new IllegalArgumentException(String.format("Failed to convert value(key='%s') " +
+ "'%s'(%s) to UUID", key, rawValue,
+ rawValue.getClass()));
+ }
+
+ private static Boolean parseBoolean(String key, Object rawValue) {
+ if (rawValue instanceof Boolean) {
+ return (Boolean) rawValue;
+ }
+ if (rawValue instanceof String) {
+ String value = ((String) rawValue).toLowerCase();
+ if (ACCEPTABLE_TRUE.contains(value)) {
+ return true;
+ } else if (ACCEPTABLE_FALSE.contains(value)) {
+ return false;
+ } else {
+ throw new IllegalArgumentException(String.format(
+ "Failed to convert '%s'(key='%s') to Boolean, " +
+ "the acceptable boolean strings are %s or %s",
+ key, rawValue, ACCEPTABLE_TRUE, ACCEPTABLE_FALSE));
+ }
+ }
+ throw new IllegalArgumentException(String.format("Failed to convert value(key='%s') " +
+ "'%s'(%s) to Boolean", key, rawValue,
+ rawValue.getClass()));
+ }
+
+ private static Number parseNumber(String key, Object value, DataType dataType) {
+ E.checkState(dataType.isNumber(), "The target data type must be number");
+
+ if (dataType.clazz().isInstance(value)) {
+ return (Number) value;
+ }
+ try {
+ switch (dataType) {
+ case BYTE:
+ return Byte.valueOf(value.toString());
+ case INT:
+ return Integer.valueOf(value.toString());
+ case LONG:
+ return parseLong(value.toString());
+ case FLOAT:
+ return Float.valueOf(value.toString());
+ case DOUBLE:
+ return Double.valueOf(value.toString());
+ default:
+ throw new AssertionError(String.format("Number type only contains Byte, " +
+ "Integer, Long, Float, Double, " +
+ "but got %s", dataType.clazz()));
+ }
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException(String.format("Failed to convert value(key=%s) " +
+ "'%s'(%s) to Number", key, value,
+ value.getClass()), e);
+ }
+ }
+
+ private static long parseLong(String rawValue) {
+ if (rawValue.startsWith("-")) {
+ return Long.parseLong(rawValue);
+ } else {
+ return Long.parseUnsignedLong(rawValue);
+ }
+ }
+
+ private static Date parseDate(String key, Object value, String dateFormat, String timeZone) {
+ if (value instanceof Date) {
+ return (Date) value;
+ }
+ if (value instanceof Number) {
+ return new Date(((Number) value).longValue());
+ } else if (value instanceof String) {
+ if (Constants.TIMESTAMP.equals(dateFormat)) {
+ try {
+ long timestamp = Long.parseLong((String) value);
+ return new Date(timestamp);
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException(String.format("Invalid timestamp value " +
+ "'%s'", value));
+ }
+ } else {
+ return DateUtils.parse((String) value, dateFormat, timeZone);
+ }
+ }
+ throw new IllegalArgumentException(String.format("Failed to convert value(key='%s') " +
+ "'%s'(%s) to Date", key, value,
+ value.getClass()));
+ }
+
+ /**
+ * Check type of the value valid
+ */
+ private static boolean checkDataType(String key, Object value, DataType dataType) {
+ if (value instanceof Number && dataType.isNumber()) {
+ return parseNumber(key, value, dataType) != null;
+ }
+ return dataType.clazz().isInstance(value);
+ }
+}
diff --git a/hugegraph-spark-connector/src/main/java/org/apache/hugegraph/spark/connector/utils/DateUtils.java b/hugegraph-spark-connector/src/main/java/org/apache/hugegraph/spark/connector/utils/DateUtils.java
new file mode 100644
index 00000000..763762df
--- /dev/null
+++ b/hugegraph-spark-connector/src/main/java/org/apache/hugegraph/spark/connector/utils/DateUtils.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.spark.connector.utils;
+
+import java.util.Date;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hugegraph.date.SafeDateFormat;
+
+public final class DateUtils {
+
+ private static final Map<String, SafeDateFormat> DATE_FORMATS = new ConcurrentHashMap<>();
+
+ public static Date parse(String source, String df, String timeZone) {
+ SafeDateFormat dateFormat = getDateFormat(df);
+ // parse date with specified timezone
+ dateFormat.setTimeZone(timeZone);
+ return dateFormat.parse(source);
+ }
+
+ private static SafeDateFormat getDateFormat(String df) {
+ SafeDateFormat dateFormat = DATE_FORMATS.get(df);
+ if (dateFormat == null) {
+ dateFormat = new SafeDateFormat(df);
+ SafeDateFormat previous = DATE_FORMATS.putIfAbsent(df, dateFormat);
+ if (previous != null) {
+ dateFormat = previous;
+ }
+ }
+ return dateFormat;
+ }
+}
diff --git a/hugegraph-spark-connector/src/main/java/org/apache/hugegraph/spark/connector/utils/HGUtils.java b/hugegraph-spark-connector/src/main/java/org/apache/hugegraph/spark/connector/utils/HGUtils.java
new file mode 100644
index 00000000..93df3bde
--- /dev/null
+++ b/hugegraph-spark-connector/src/main/java/org/apache/hugegraph/spark/connector/utils/HGUtils.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.spark.connector.utils;
+
+import java.util.List;
+
+import org.apache.hugegraph.spark.connector.mapping.EdgeMapping;
+import org.apache.hugegraph.spark.connector.mapping.VertexMapping;
+import org.apache.hugegraph.spark.connector.options.HGOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class HGUtils {
+
+ private static final Logger LOG = LoggerFactory.getLogger(HGUtils.class);
+
+ public static VertexMapping vertexMappingFromConf(HGOptions hgOptions) {
+ String idField = hgOptions.idField();
+ VertexMapping vertexMapping = new VertexMapping(idField);
+
+ String label = hgOptions.label();
+ vertexMapping.label(label);
+ vertexMapping.batchSize(hgOptions.batchSize());
+ vertexMapping.selectedFields(hgOptions.selectedFields());
+ vertexMapping.ignoredFields(hgOptions.ignoredFields());
+ vertexMapping.check();
+
+ // TODO mappingFields, mappingValues, nullValues, updateStrategies
+ LOG.info("Update VertexMapping: {}", vertexMapping);
+ return vertexMapping;
+ }
+
+ public static EdgeMapping edgeMappingFromConf(HGOptions hgOptions) {
+ List<String> sourceNames = hgOptions.sourceName();
+ List<String> targetNames = hgOptions.targetName();
+ EdgeMapping edgeMapping = new EdgeMapping(sourceNames, targetNames);
+
+ String label = hgOptions.label();
+ edgeMapping.label(label);
+ edgeMapping.batchSize(hgOptions.batchSize());
+ edgeMapping.selectedFields(hgOptions.selectedFields());
+ edgeMapping.ignoredFields(hgOptions.ignoredFields());
+ edgeMapping.check();
+
+ // TODO mappingFields, mappingValues, nullValues, updateStrategies
+ LOG.info("Update EdgeMapping: {}", edgeMapping);
+ return edgeMapping;
+ }
+}
diff --git a/hugegraph-spark-connector/src/main/resources/log4j2.xml b/hugegraph-spark-connector/src/main/resources/log4j2.xml
new file mode 100644
index 00000000..b3a26404
--- /dev/null
+++ b/hugegraph-spark-connector/src/main/resources/log4j2.xml
@@ -0,0 +1,72 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with this
+ work for additional information regarding copyright ownership. The ASF
+ licenses this file to You under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ License for the specific language governing permissions and limitations
+ under the License.
+ -->
+<configuration status="error">
+ <Properties>
+ <property name="log-charset">UTF-8</property>
+ </Properties>
+ <appenders>
+ <Console name="console" target="SYSTEM_OUT">
+ <ThresholdFilter level="INFO" onMatch="ACCEPT" onMismatch="DENY"/>
+ <PatternLayout charset="${log-charset}" pattern="%-d{yyyy-MM-dd HH:mm:ss} [%t] [%-5p] %c{1.} %x - %m%n" />
+ </Console>
+
+ <RollingRandomAccessFile name="file" fileName="logs/hugegraph-spark-connector.log"
+ filePattern="logs/hugegraph-spark-connector-%d{yyyy-MM-dd}-%i.log">
+ <ThresholdFilter level="INFO" onMatch="ACCEPT" onMismatch="DENY"/>
+ <PatternLayout charset="${log-charset}" pattern="%-d{yyyy-MM-dd HH:mm:ss} [%t] [%-5p] %c{1.} %x - %m%n" />
+ <!-- Trigger after exceeding 1day or 100MB -->
+ <Policies>
+ <SizeBasedTriggeringPolicy size="100MB"/>
+ <TimeBasedTriggeringPolicy interval="1" modulate="true" />
+ </Policies>
+ <!-- Keep max 5 files per day & auto delete after over 1GB or 100 files -->
+ <DefaultRolloverStrategy max="5">
+ <Delete basePath="logs" maxDepth="2">
+ <IfFileName glob="*/*.log"/>
+ <!-- Limit log amount & size -->
+ <IfAny>
+ <IfAccumulatedFileSize exceeds="1GB" />
+ <IfAccumulatedFileCount exceeds="100" />
+ </IfAny>
+ </Delete>
+ </DefaultRolloverStrategy>
+ </RollingRandomAccessFile>
+ </appenders>
+ <loggers>
+ <root level="INFO">
+ <appender-ref ref="console"/>
+ <appender-ref ref="file"/>
+ </root>
+ <logger name="org.apache.cassandra" level="WARN" additivity="false">
+ <appender-ref ref="file"/>
+ </logger>
+ <logger name="org.apache.hadoop" level="WARN" additivity="false">
+ <appender-ref ref="file"/>
+ </logger>
+ <logger name="io.netty" level="WARN" additivity="false">
+ <appender-ref ref="file"/>
+ </logger>
+ <logger name="org.apache.commons" level="WARN" additivity="false">
+ <appender-ref ref="file"/>
+ </logger>
+ <logger name="org.apache.hugegraph" level="INFO" additivity="false">
+ <appender-ref ref="file"/>
+ <appender-ref ref="console"/>
+ </logger>
+ </loggers>
+</configuration>
diff --git a/hugegraph-spark-connector/src/main/scala/org/apache/hugegraph/spark/connector/DataSource.scala b/hugegraph-spark-connector/src/main/scala/org/apache/hugegraph/spark/connector/DataSource.scala
new file mode 100644
index 00000000..1183b7c3
--- /dev/null
+++ b/hugegraph-spark-connector/src/main/scala/org/apache/hugegraph/spark/connector/DataSource.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.spark.connector
+
+import org.apache.hugegraph.spark.connector.options.HGOptions
+import org.apache.spark.sql.connector.catalog.{Table, TableProvider}
+import org.apache.spark.sql.connector.expressions.Transform
+import org.apache.spark.sql.sources.DataSourceRegister
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+import org.slf4j.LoggerFactory
+
+import java.util
+
+class DataSource extends TableProvider with DataSourceRegister {
+
+ private val LOG = LoggerFactory.getLogger(this.getClass)
+
+ private var schema: StructType = _
+
+ private var hgOptions: HGOptions = _
+
+ override def inferSchema(options: CaseInsensitiveStringMap): StructType = {
+ hgOptions = new HGOptions(options.asCaseSensitiveMap())
+ LOG.info(s"HugeGraph Options: ${hgOptions.getAllParameters}")
+ schema = new StructType()
+ LOG.info(s"Writer infer schema: ${schema}")
+ schema
+ }
+
+ override def getTable(schema: StructType, partitioning: Array[Transform],
+ properties: util.Map[String, String]): Table = {
+ LOG.info(s"Get table schema: ${schema}")
+ new HGTable(schema, hgOptions)
+ }
+
+ // override def supportsExternalMetadata(): Boolean = true
+
+ override def shortName(): String = "HugeGraph"
+}
diff --git a/hugegraph-spark-connector/src/main/scala/org/apache/hugegraph/spark/connector/HGTable.scala b/hugegraph-spark-connector/src/main/scala/org/apache/hugegraph/spark/connector/HGTable.scala
new file mode 100644
index 00000000..28c67951
--- /dev/null
+++ b/hugegraph-spark-connector/src/main/scala/org/apache/hugegraph/spark/connector/HGTable.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.spark.connector
+
+import org.apache.hugegraph.spark.connector.options.HGOptions
+import org.apache.hugegraph.spark.connector.writer.HGWriterBuilder
+import org.apache.spark.sql.connector.catalog.{SupportsWrite, TableCapability}
+import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
+import org.apache.spark.sql.types.StructType
+import org.slf4j.LoggerFactory
+
+import java.util
+import scala.collection.JavaConverters.setAsJavaSetConverter
+
+class HGTable(schema: StructType, hgOptions: HGOptions) extends SupportsWrite {
+
+ private val LOG = LoggerFactory.getLogger(this.getClass)
+
+ override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
+ LOG.info(s"User Config Options ${info.options().asCaseSensitiveMap()}")
+ LOG.info(s"Logical Write schema: ${info.schema()}")
+ new HGWriterBuilder(info.schema(), hgOptions)
+ }
+
+ override def name(): String = hgOptions.label()
+
+ override def schema(): StructType = schema
+
+ override def capabilities(): util.Set[TableCapability] =
+ Set(
+ TableCapability.BATCH_READ,
+ TableCapability.BATCH_WRITE,
+ TableCapability.ACCEPT_ANY_SCHEMA,
+ TableCapability.OVERWRITE_BY_FILTER,
+ TableCapability.OVERWRITE_DYNAMIC,
+ TableCapability.STREAMING_WRITE,
+ TableCapability.MICRO_BATCH_READ
+ ).asJava
+}
diff --git a/hugegraph-spark-connector/src/main/scala/org/apache/hugegraph/spark/connector/utils/HGBuildUtils.scala b/hugegraph-spark-connector/src/main/scala/org/apache/hugegraph/spark/connector/utils/HGBuildUtils.scala
new file mode 100644
index 00000000..bac8aa31
--- /dev/null
+++ b/hugegraph-spark-connector/src/main/scala/org/apache/hugegraph/spark/connector/utils/HGBuildUtils.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.spark.connector.utils
+
+import org.apache.hugegraph.spark.connector.builder.{EdgeBuilder, VertexBuilder}
+import org.apache.hugegraph.spark.connector.client.HGLoadContext
+import org.apache.hugegraph.structure.graph.{Edge, Vertex}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types.StructType
+import org.slf4j.LoggerFactory
+
+import scala.collection.JavaConverters.{collectionAsScalaIterableConverter, seqAsJavaListConverter}
+
+object HGBuildUtils {
+
+ private val LOG = LoggerFactory.getLogger(this.getClass)
+
+ def buildVertices(row: InternalRow, schema: StructType, builder: VertexBuilder): List[Vertex] = {
+ val fields = schema.names
+ val dataTypes = schema.fields.map(field => field.dataType)
+ val values = for {
+ idx <- schema.fields.indices
+ } yield {
+ val value = row.get(idx, dataTypes(idx))
+ if (value.getClass.getSimpleName.equalsIgnoreCase("UTF8String")) {
+ value.toString
+ }
+ else {
+ value
+ }
+ }
+ LOG.info(s"Fields: ${fields.mkString(", ")}, values: ${values.mkString(", ")}")
+ builder.build(fields, values.toArray).asScala.toList
+ }
+
+ def buildEdges(row: InternalRow, schema: StructType, builder: EdgeBuilder): List[Edge] = {
+ val fields = schema.names
+ val dataTypes = schema.fields.map(field => field.dataType)
+ val values = for {
+ idx <- schema.fields.indices
+ } yield {
+ val value = row.get(idx, dataTypes(idx))
+ if (value.getClass.getSimpleName.equalsIgnoreCase("UTF8String")) {
+ value.toString
+ }
+ else {
+ value
+ }
+ }
+ LOG.info(s"Fields: ${fields.mkString(", ")}, values: ${values.mkString(", ")}")
+ builder.build(fields, values.toArray).asScala.toList
+ }
+
+ def saveVertices(context: HGLoadContext, vertices: List[Vertex]): List[Vertex] = {
+ val successVertices = context.client().graph().addVertices(vertices.asJava).asScala.toList
+ successVertices
+ }
+
+ def saveEdges(context: HGLoadContext, edges: List[Edge]): List[Edge] = {
+ val successVertices = context.client().graph().addEdges(edges.asJava).asScala.toList
+ successVertices
+ }
+}
diff --git a/hugegraph-spark-connector/src/main/scala/org/apache/hugegraph/spark/connector/writer/HGBatchWriter.scala b/hugegraph-spark-connector/src/main/scala/org/apache/hugegraph/spark/connector/writer/HGBatchWriter.scala
new file mode 100644
index 00000000..7dd46ea1
--- /dev/null
+++ b/hugegraph-spark-connector/src/main/scala/org/apache/hugegraph/spark/connector/writer/HGBatchWriter.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.spark.connector.writer
+
+import org.apache.hugegraph.spark.connector.options.HGOptions
+import org.apache.spark.sql.connector.write.{BatchWrite, DataWriterFactory, PhysicalWriteInfo, WriterCommitMessage}
+import org.apache.spark.sql.types.StructType
+import org.slf4j.LoggerFactory
+
+class HGBatchWriter(schema: StructType, hgOptions: HGOptions) extends BatchWrite {
+
+ private val LOG = LoggerFactory.getLogger(this.getClass)
+
+ override def createBatchWriterFactory(info: PhysicalWriteInfo): DataWriterFactory = {
+ new HGBatchWriterFactory(schema, hgOptions)
+ }
+
+ override def commit(messages: Array[WriterCommitMessage]): Unit = {
+ for (elem <- messages) {
+ val msg = elem.asInstanceOf[HGCommitMessage]
+ LOG.info(s"Commit Message ${msg}")
+ }
+ }
+
+ override def abort(messages: Array[WriterCommitMessage]): Unit = {
+ LOG.error("HugeGraph BatchWriter abort.")
+ }
+}
diff --git a/hugegraph-spark-connector/src/main/scala/org/apache/hugegraph/spark/connector/writer/HGBatchWriterFactory.scala b/hugegraph-spark-connector/src/main/scala/org/apache/hugegraph/spark/connector/writer/HGBatchWriterFactory.scala
new file mode 100644
index 00000000..ef50fc77
--- /dev/null
+++ b/hugegraph-spark-connector/src/main/scala/org/apache/hugegraph/spark/connector/writer/HGBatchWriterFactory.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.spark.connector.writer
+
+import org.apache.hugegraph.spark.connector.constant.DataTypeEnum
+import org.apache.hugegraph.spark.connector.options.HGOptions
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.connector.write.{DataWriter, DataWriterFactory}
+import org.apache.spark.sql.types.StructType
+import org.slf4j.LoggerFactory
+
+class HGBatchWriterFactory(schema: StructType, hgOptions: HGOptions) extends DataWriterFactory {
+
+ private val LOG = LoggerFactory.getLogger(this.getClass)
+
+ override def createWriter(partitionId: Int, taskId: Long): DataWriter[InternalRow] = {
+ val dataType = hgOptions.dataType()
+ LOG.info(s"Create a ${dataType} writer, partitionId: ${partitionId}, taskId: ${taskId}")
+ if (dataType == "vertex") {
+ new HGVertexWriter(schema, hgOptions)
+ }
+ else {
+ new HGEdgeWriter(schema, hgOptions)
+ }
+ }
+}
diff --git a/hugegraph-spark-connector/src/main/scala/org/apache/hugegraph/spark/connector/writer/HGCommitMessage.scala b/hugegraph-spark-connector/src/main/scala/org/apache/hugegraph/spark/connector/writer/HGCommitMessage.scala
new file mode 100644
index 00000000..ab419c64
--- /dev/null
+++ b/hugegraph-spark-connector/src/main/scala/org/apache/hugegraph/spark/connector/writer/HGCommitMessage.scala
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.spark.connector.writer
+
+import org.apache.spark.sql.connector.write.WriterCommitMessage
+
+case class HGCommitMessage(successfulMsg: List[String]) extends WriterCommitMessage
diff --git a/hugegraph-spark-connector/src/main/scala/org/apache/hugegraph/spark/connector/writer/HGEdgeWriter.scala b/hugegraph-spark-connector/src/main/scala/org/apache/hugegraph/spark/connector/writer/HGEdgeWriter.scala
new file mode 100644
index 00000000..6f930ddd
--- /dev/null
+++ b/hugegraph-spark-connector/src/main/scala/org/apache/hugegraph/spark/connector/writer/HGEdgeWriter.scala
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.spark.connector.writer
+
+import org.apache.hugegraph.spark.connector.builder.EdgeBuilder
+import org.apache.hugegraph.spark.connector.client.HGLoadContext
+import org.apache.hugegraph.spark.connector.mapping.EdgeMapping
+import org.apache.hugegraph.spark.connector.options.HGOptions
+import org.apache.hugegraph.spark.connector.utils.HGUtils
+import org.apache.hugegraph.spark.connector.utils.HGBuildUtils
+import org.apache.hugegraph.structure.graph.Edge
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.connector.write.{DataWriter, WriterCommitMessage}
+import org.apache.spark.sql.types.StructType
+import org.slf4j.LoggerFactory
+
+import scala.collection.mutable.ListBuffer
+
+class HGEdgeWriter(schema: StructType, hgOptions: HGOptions) extends DataWriter[InternalRow] {
+
+ private val LOG = LoggerFactory.getLogger(this.getClass)
+
+ val context = new HGLoadContext(hgOptions)
+ context.updateSchemaCache()
+ context.setLoadingMode()
+
+ val mapping: EdgeMapping = HGUtils.edgeMappingFromConf(hgOptions)
+ val builder = new EdgeBuilder(context, mapping)
+
+ private var edgesBuffer: ListBuffer[Edge] = new ListBuffer()
+
+ var cnt = 0
+
+ override def write(record: InternalRow): Unit = {
+ val edges = HGBuildUtils.buildEdges(record, schema, builder)
+
+ for (edge <- edges) {
+ edgesBuffer.+=(edge)
+ }
+
+ if (edgesBuffer.size >= hgOptions.batchSize()) {
+ sinkOnce()
+ }
+ }
+
+ private def sinkOnce(): Unit = {
+ LOG.info(s"Writer once: ${edgesBuffer.toList}")
+ val successfulVertices = HGBuildUtils.saveEdges(context, edgesBuffer.toList)
+ val successIds = successfulVertices.map(v => v.id())
+ cnt += successIds.length
+ edgesBuffer.clear()
+ }
+
+ override def commit(): WriterCommitMessage = {
+ if (edgesBuffer.nonEmpty) {
+ sinkOnce()
+ }
+ context.unsetLoadingMode()
+ HGCommitMessage(List("Success cnt: " + cnt))
+ }
+
+ override def abort(): Unit = {
+ context.unsetLoadingMode()
+ LOG.error("Load Task abort.")
+ }
+
+ override def close(): Unit = {
+ context.close()
+ }
+}
diff --git a/hugegraph-spark-connector/src/main/scala/org/apache/hugegraph/spark/connector/writer/HGVertexWriter.scala b/hugegraph-spark-connector/src/main/scala/org/apache/hugegraph/spark/connector/writer/HGVertexWriter.scala
new file mode 100644
index 00000000..e5823eb3
--- /dev/null
+++ b/hugegraph-spark-connector/src/main/scala/org/apache/hugegraph/spark/connector/writer/HGVertexWriter.scala
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.spark.connector.writer
+
+import org.apache.hugegraph.spark.connector.builder.VertexBuilder
+import org.apache.hugegraph.spark.connector.client.HGLoadContext
+import org.apache.hugegraph.spark.connector.mapping.VertexMapping
+import org.apache.hugegraph.spark.connector.options.HGOptions
+import org.apache.hugegraph.spark.connector.utils.HGUtils
+import org.apache.hugegraph.spark.connector.utils.HGBuildUtils
+import org.apache.hugegraph.structure.graph.Vertex
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.connector.write.{DataWriter, WriterCommitMessage}
+import org.apache.spark.sql.types.StructType
+import org.slf4j.LoggerFactory
+
+import scala.collection.mutable.ListBuffer
+
+class HGVertexWriter(schema: StructType, hgOptions: HGOptions) extends DataWriter[InternalRow] {
+
+ private val LOG = LoggerFactory.getLogger(this.getClass)
+
+ val context = new HGLoadContext(hgOptions)
+ context.updateSchemaCache()
+ context.setLoadingMode()
+
+ val mapping: VertexMapping = HGUtils.vertexMappingFromConf(hgOptions)
+ val builder = new VertexBuilder(context, mapping)
+
+ private var verticesBuffer: ListBuffer[Vertex] = new ListBuffer()
+
+ var cnt = 0
+
+ override def write(record: InternalRow): Unit = {
+ val vertices = HGBuildUtils.buildVertices(record, schema, builder)
+
+ for (vertex <- vertices) {
+ verticesBuffer.+=(vertex)
+ }
+
+ if (verticesBuffer.size >= hgOptions.batchSize()) {
+ sinkOnce()
+ }
+ }
+
+ private def sinkOnce(): Unit = {
+ verticesBuffer.foreach(e => e.id())
+ LOG.info(s"Writer once: ${verticesBuffer.toList}")
+ val successfulVertices = HGBuildUtils.saveVertices(context, verticesBuffer.toList)
+ val successIds = successfulVertices.map(_.id()).mkString(",")
+ cnt += successfulVertices.length
+ verticesBuffer.clear()
+ }
+
+ override def commit(): WriterCommitMessage = {
+ if (verticesBuffer.nonEmpty) {
+ sinkOnce()
+ }
+ context.unsetLoadingMode()
+ HGCommitMessage(List("Success cnt: " + cnt))
+ }
+
+ override def abort(): Unit = {
+ context.unsetLoadingMode()
+ LOG.error("Load Task abort.")
+ }
+
+ override def close(): Unit = {
+ context.close()
+ }
+}
diff --git a/hugegraph-spark-connector/src/main/scala/org/apache/hugegraph/spark/connector/writer/HGWriterBuilder.scala b/hugegraph-spark-connector/src/main/scala/org/apache/hugegraph/spark/connector/writer/HGWriterBuilder.scala
new file mode 100644
index 00000000..ac770b1c
--- /dev/null
+++ b/hugegraph-spark-connector/src/main/scala/org/apache/hugegraph/spark/connector/writer/HGWriterBuilder.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.spark.connector.writer
+
+import org.apache.hugegraph.spark.connector.options.HGOptions
+import org.apache.spark.sql.connector.write.{BatchWrite, SupportsOverwrite, SupportsTruncate, WriteBuilder}
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.StructType
+import org.slf4j.LoggerFactory
+
+class HGWriterBuilder(schema: StructType, hgOptions: HGOptions)
+ extends WriteBuilder with SupportsOverwrite with SupportsTruncate {
+
+ private val LOG = LoggerFactory.getLogger(this.getClass)
+
+ override def buildForBatch(): BatchWrite = {
+ new HGBatchWriter(schema, hgOptions)
+ }
+
+ override def overwrite(filters: Array[Filter]): WriteBuilder = {
+ new HGWriterBuilder(schema, hgOptions)
+ }
+}
diff --git a/hugegraph-spark-connector/src/test/java/org/apache/hugegraph/spark/connector/builder/EdgeBuilderTest.java b/hugegraph-spark-connector/src/test/java/org/apache/hugegraph/spark/connector/builder/EdgeBuilderTest.java
new file mode 100644
index 00000000..c6e05c4a
--- /dev/null
+++ b/hugegraph-spark-connector/src/test/java/org/apache/hugegraph/spark/connector/builder/EdgeBuilderTest.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.spark.connector.builder;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hugegraph.driver.HugeClient;
+import org.apache.hugegraph.spark.connector.client.HGLoadContext;
+import org.apache.hugegraph.spark.connector.mapping.EdgeMapping;
+import org.apache.hugegraph.spark.connector.options.HGOptions;
+import org.apache.hugegraph.spark.connector.utils.HGEnvUtils;
+import org.apache.hugegraph.structure.graph.Edge;
+import org.apache.hugegraph.structure.schema.EdgeLabel;
+import org.apache.hugegraph.testutil.Assert;
+import org.jetbrains.annotations.NotNull;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class EdgeBuilderTest {
+
+ private static HugeClient client;
+
+ @BeforeClass
+ public static void setUp() {
+ HGEnvUtils.createEnv();
+ client = HGEnvUtils.getHugeClient();
+ }
+
+ @Test
+ public void testEdgeIdFields() {
+ HGLoadContext context = getEdgeLoadContext();
+
+ EdgeMapping edgeMapping = new EdgeMapping(Collections.singletonList("v1-name"),
+ Collections.singletonList("v2-name"));
+ edgeMapping.label("created");
+ EdgeBuilder edgeBuilder = new EdgeBuilder(context, edgeMapping);
+
+ EdgeLabel edgeLabel = (EdgeLabel) edgeBuilder.schemaLabel();
+ Assert.assertEquals("person", edgeLabel.sourceLabel());
+ Assert.assertEquals("software", edgeLabel.targetLabel());
+ Assert.assertTrue(edgeBuilder.isIdField("v1-name"));
+ Assert.assertTrue(edgeBuilder.isIdField("v2-name"));
+ Assert.assertEquals("edge-mapping(label=created)",
+ edgeBuilder.mapping().toString());
+ }
+
+ @Test
+ public void testEdgeBuild() {
+ HGLoadContext context = getEdgeLoadContext();
+
+ EdgeMapping edgeMapping = new EdgeMapping(Collections.singletonList("v1-name"),
+ Collections.singletonList("name"));
+ edgeMapping.label("created");
+ EdgeBuilder edgeBuilder = new EdgeBuilder(context, edgeMapping);
+
+ String[] names = {"v1-name", "name", "date", "weight"};
+ Object[] values = {"josh", "lop", "2009-11-11", 0.4};
+
+ List<Edge> edges = edgeBuilder.build(names, values);
+ Assert.assertEquals(1, edges.size());
+ Edge edge = edges.get(0);
+ Assert.assertEquals("josh", edge.sourceId());
+ Assert.assertEquals("2:lop", edge.targetId());
+ }
+
+ @NotNull
+ private static HGLoadContext getEdgeLoadContext() {
+ Map<String, String> configs = new HashMap<>();
+ configs.put("host", HGEnvUtils.DEFAULT_HOST);
+ configs.put("port", HGEnvUtils.DEFAULT_PORT);
+
+ configs.put("data-type", "edge");
+ configs.put("label", "created");
+ configs.put("source-name", "v1-name");
+ configs.put("target-name", "name");
+ HGOptions options = new HGOptions(configs);
+ HGLoadContext context = new HGLoadContext(options);
+ context.updateSchemaCache();
+ return context;
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ HGEnvUtils.destroyEnv();
+ }
+}
diff --git a/hugegraph-spark-connector/src/test/java/org/apache/hugegraph/spark/connector/builder/VertexBuilderTest.java b/hugegraph-spark-connector/src/test/java/org/apache/hugegraph/spark/connector/builder/VertexBuilderTest.java
new file mode 100644
index 00000000..559bb031
--- /dev/null
+++ b/hugegraph-spark-connector/src/test/java/org/apache/hugegraph/spark/connector/builder/VertexBuilderTest.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.spark.connector.builder;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hugegraph.driver.HugeClient;
+import org.apache.hugegraph.spark.connector.client.HGLoadContext;
+import org.apache.hugegraph.spark.connector.mapping.VertexMapping;
+import org.apache.hugegraph.spark.connector.options.HGOptions;
+import org.apache.hugegraph.spark.connector.utils.HGEnvUtils;
+import org.apache.hugegraph.structure.graph.Vertex;
+import org.apache.hugegraph.structure.schema.VertexLabel;
+import org.apache.hugegraph.testutil.Assert;
+import org.jetbrains.annotations.NotNull;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class VertexBuilderTest {
+
+ private static HugeClient client;
+
+ @BeforeClass
+ public static void setUp() {
+ HGEnvUtils.createEnv();
+ client = HGEnvUtils.getHugeClient();
+ }
+
+ @Test
+ public void testCustomizeIdFields() {
+ HGLoadContext context = getCustomizeIdVertexContext();
+
+ VertexMapping vertexMapping = new VertexMapping("name");
+ vertexMapping.label("person");
+ VertexBuilder vertexBuilder = new VertexBuilder(context, vertexMapping);
+
+ VertexLabel vertexLabel = (VertexLabel) vertexBuilder.schemaLabel();
+ Assert.assertTrue(vertexLabel.idStrategy().isCustomizeString());
+
+ Assert.assertTrue(vertexBuilder.isIdField("name"));
+ System.out.println(vertexBuilder.mapping());
+ Assert.assertEquals("vertex-mapping(label=person)",
+ vertexBuilder.mapping().toString());
+ }
+
+ @Test
+ public void testCustomizeWithNullIdField() {
+ HGLoadContext context = getCustomizeIdVertexContext();
+
+ VertexMapping vertexMapping = new VertexMapping(null);
+ vertexMapping.label("person");
+ Assert.assertThrows(IllegalStateException.class, () -> {
+ new VertexBuilder(context, vertexMapping);
+ });
+ }
+
+ @NotNull
+ private static HGLoadContext getCustomizeIdVertexContext() {
+ Map<String, String> configs = new HashMap<>();
+ configs.put("host", HGEnvUtils.DEFAULT_HOST);
+ configs.put("port", HGEnvUtils.DEFAULT_PORT);
+
+ configs.put("data-type", "vertex");
+ configs.put("label", "person");
+ configs.put("id", "name");
+ HGOptions options = new HGOptions(configs);
+ HGLoadContext context = new HGLoadContext(options);
+ context.updateSchemaCache();
+ return context;
+ }
+
+ @Test
+ public void testCustomizeBuildVertex() {
+ HGLoadContext context = getCustomizeIdVertexContext();
+
+ VertexMapping vertexMapping = new VertexMapping("name");
+ vertexMapping.label("person");
+ VertexBuilder vertexBuilder = new VertexBuilder(context, vertexMapping);
+
+ String[] names = {"name", "age", "city"};
+ Object[] values = {"marko", "29", "Beijing"};
+
+ List<Vertex> vertices = vertexBuilder.build(names, values);
+ Assert.assertEquals(1, vertices.size());
+ Vertex vertex = vertices.get(0);
+ Assert.assertEquals("marko", vertex.id());
+ Assert.assertEquals("person", vertex.label());
+ Map<String, Object> properties = vertex.properties();
+ for (int i = 0; i < names.length; i++) {
+ String name = names[i];
+ Assert.assertTrue(properties.containsKey(name));
+ Assert.assertEquals(values[i], String.valueOf(properties.get(name)));
+ }
+ }
+
+ @Test
+ public void testPrimaryIdFields() {
+ HGLoadContext context = getPrimaryIdVertexContext();
+ VertexMapping vertexMapping = new VertexMapping("name");
+ vertexMapping.label("software");
+ Assert.assertThrows(IllegalStateException.class, () -> {
+ new VertexBuilder(context, vertexMapping);
+ });
+ }
+
+ @Test
+ public void testPrimaryWithNullIdField() {
+ HGLoadContext context = getPrimaryIdVertexContext();
+ VertexMapping vertexMapping = new VertexMapping(null);
+ vertexMapping.label("software");
+ VertexBuilder vertexBuilder = new VertexBuilder(context, vertexMapping);
+ VertexLabel vertexLabel = (VertexLabel) vertexBuilder.schemaLabel();
+ Assert.assertTrue(vertexLabel.idStrategy().isPrimaryKey());
+ Assert.assertEquals("vertex-mapping(label=software)",
+ vertexBuilder.mapping().toString());
+ }
+
+ @NotNull
+ private static HGLoadContext getPrimaryIdVertexContext() {
+ Map<String, String> configs = new HashMap<>();
+ configs.put("host", HGEnvUtils.DEFAULT_HOST);
+ configs.put("port", HGEnvUtils.DEFAULT_PORT);
+
+ configs.put("data-type", "vertex");
+ configs.put("label", "software");
+ HGOptions options = new HGOptions(configs);
+ HGLoadContext context = new HGLoadContext(options);
+ context.updateSchemaCache();
+ return context;
+ }
+
+ @Test
+ public void testPrimaryBuildVertex() {
+ HGLoadContext context = getCustomizeIdVertexContext();
+
+ VertexMapping vertexMapping = new VertexMapping(null);
+ vertexMapping.label("software");
+ VertexBuilder vertexBuilder = new VertexBuilder(context, vertexMapping);
+
+ String[] names = {"name", "lang", "price"};
+ Object[] values = {"lop", "java", "328.0"};
+ List<Vertex> vertices = vertexBuilder.build(names, values);
+ Assert.assertEquals(1, vertices.size());
+ Vertex vertex = vertices.get(0);
+ Assert.assertNull(vertex.id());
+ Assert.assertEquals("software", vertex.label());
+ Map<String, Object> properties = vertex.properties();
+ for (int i = 0; i < names.length; i++) {
+ String name = names[i];
+ Assert.assertTrue(properties.containsKey(name));
+ Assert.assertEquals(values[i], String.valueOf(properties.get(name)));
+ }
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ HGEnvUtils.destroyEnv();
+ }
+}
diff --git a/hugegraph-spark-connector/src/test/java/org/apache/hugegraph/spark/connector/client/HGClientHolderTest.java b/hugegraph-spark-connector/src/test/java/org/apache/hugegraph/spark/connector/client/HGClientHolderTest.java
new file mode 100644
index 00000000..c0009418
--- /dev/null
+++ b/hugegraph-spark-connector/src/test/java/org/apache/hugegraph/spark/connector/client/HGClientHolderTest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.spark.connector.client;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hugegraph.driver.HugeClient;
+import org.apache.hugegraph.spark.connector.options.HGOptions;
+import org.apache.hugegraph.spark.connector.utils.HGEnvUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class HGClientHolderTest {
+
+ @Test
+ public void testHGClientHolder() {
+ Map<String, String> configs = new HashMap<>();
+ configs.put("host", HGEnvUtils.DEFAULT_HOST);
+ configs.put("port", HGEnvUtils.DEFAULT_PORT);
+
+ configs.put("data-type", "vertex");
+ configs.put("label", "person");
+ configs.put("id", "name");
+ HGOptions options = new HGOptions(configs);
+ HugeClient client = HGClientHolder.create(options);
+ Assert.assertNotNull(client);
+ client.close();
+ }
+}
diff --git a/hugegraph-spark-connector/src/test/java/org/apache/hugegraph/spark/connector/mapping/EdgeMappingTest.java b/hugegraph-spark-connector/src/test/java/org/apache/hugegraph/spark/connector/mapping/EdgeMappingTest.java
new file mode 100644
index 00000000..cf141058
--- /dev/null
+++ b/hugegraph-spark-connector/src/test/java/org/apache/hugegraph/spark/connector/mapping/EdgeMappingTest.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.spark.connector.mapping;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hugegraph.testutil.Assert;
+import org.junit.Test;
+
+public class EdgeMappingTest {
+
+ @Test
+ public void testBasicEdgeMapping() {
+ List<String> sourceFields = Arrays.asList("id", "name", "age");
+ List<String> targetFields = Arrays.asList("id", "ISBN", "price");
+ EdgeMapping edgeMapping = new EdgeMapping(sourceFields, targetFields);
+ edgeMapping.label("buy");
+
+ Assert.assertArrayEquals(new String[]{"id", "name", "age"}, sourceFields.toArray());
+ Assert.assertArrayEquals(new String[]{"id", "ISBN", "price"}, targetFields.toArray());
+
+ Assert.assertEquals("edge", edgeMapping.type().dataType());
+ Assert.assertEquals("edge-mapping(label=buy)", edgeMapping.toString());
+ }
+
+ @Test
+ public void testEdgeMappingSourceIsNull() {
+ List<String> targetFields = Arrays.asList("id", "ISBN", "price");
+ EdgeMapping edgeMapping = new EdgeMapping(null, targetFields);
+ edgeMapping.label("buy");
+ testEdgeMappingException(edgeMapping);
+ }
+
+ @Test
+ public void testEdgeMappingSourceIsEmpty() {
+ List<String> sourceFields = new ArrayList<>();
+ List<String> targetFields = Arrays.asList("id", "ISBN", "price");
+ EdgeMapping edgeMapping = new EdgeMapping(sourceFields, targetFields);
+ edgeMapping.label("buy");
+ testEdgeMappingException(edgeMapping);
+ }
+
+ @Test
+ public void testEdgeMappingTargetIsNull() {
+ List<String> sourceFields = Arrays.asList("id", "name", "age");
+ EdgeMapping edgeMapping = new EdgeMapping(sourceFields, null);
+ edgeMapping.label("buy");
+ testEdgeMappingException(edgeMapping);
+ }
+
+ @Test
+ public void testEdgeMappingTargetIsEmpty() {
+ List<String> sourceFields = Arrays.asList("id", "name", "age");
+ List<String> targetFields = new ArrayList<>();
+ EdgeMapping edgeMapping = new EdgeMapping(sourceFields, targetFields);
+ edgeMapping.label("buy");
+ testEdgeMappingException(edgeMapping);
+ }
+
+ private void testEdgeMappingException(EdgeMapping edgeMapping) {
+ Assert.assertThrows(IllegalArgumentException.class, edgeMapping::check);
+ }
+
+ @Test
+ public void testAdvancedEdgeMapping() {
+ List<String> sourceFields = Arrays.asList("id", "name", "age");
+ List<String> targetFields = Arrays.asList("id", "ISBN", "price");
+ EdgeMapping edgeMapping = new EdgeMapping(sourceFields, targetFields);
+ edgeMapping.label("buy");
+ edgeMapping.batchSize(100);
+ Assert.assertEquals(100, edgeMapping.batchSize());
+
+ edgeMapping.selectedFields(new HashSet<>(Arrays.asList("id", "name", "lang", "price")));
+ edgeMapping.ignoredFields(new HashSet<>(Arrays.asList("ISBN", "age")));
+
+ String[] selectFields = edgeMapping.selectedFields().toArray(new String[0]);
+ Assert.assertArrayEquals(new String[]{"price", "name", "id", "lang"}, selectFields);
+ String[] ignoredFields = edgeMapping.ignoredFields().toArray(new String[0]);
+ Assert.assertArrayEquals(new String[]{"ISBN", "age"}, ignoredFields);
+
+ HashMap<String, String> fieldMapping = new HashMap<>();
+ fieldMapping.put("source_name", "name");
+ fieldMapping.put("target_name", "name");
+ edgeMapping.mappingFields(fieldMapping);
+ }
+
+ @Test
+ public void testMappingFieldValueNotExists() {
+ List<String> sourceFields = Arrays.asList("id", "name", "age");
+ List<String> targetFields = Arrays.asList("id", "ISBN", "price");
+ EdgeMapping edgeMapping = new EdgeMapping(sourceFields, targetFields);
+ edgeMapping.label("buy");
+
+ HashMap<String, String> fieldMapping = new HashMap<>();
+ fieldMapping.put("source_name", "name");
+ edgeMapping.mappingFields(fieldMapping);
+
+ Assert.assertEquals("new_source_name", edgeMapping.mappingField("new_source_name"));
+ }
+
+ @Test
+ public void testMappingFieldValueIsNull() {
+ List<String> sourceFields = Arrays.asList("id", "name", "age");
+ List<String> targetFields = Arrays.asList("id", "ISBN", "price");
+ EdgeMapping edgeMapping = new EdgeMapping(sourceFields, targetFields);
+ edgeMapping.label("buy");
+
+ HashMap<String, String> fieldMapping = new HashMap<>();
+ // The value in field_mapping can't be null
+ fieldMapping.put("source_name", null);
+ edgeMapping.mappingFields(fieldMapping);
+
+ Assert.assertThrows(IllegalArgumentException.class, edgeMapping::check);
+ }
+
+ @Test
+ public void testMappingValue() {
+ List<String> sourceFields = Arrays.asList("id", "name", "age");
+ List<String> targetFields = Arrays.asList("id", "ISBN", "price");
+ EdgeMapping edgeMapping = new EdgeMapping(sourceFields, targetFields);
+ edgeMapping.label("buy");
+
+ Map<String, Object> cityMap = new HashMap<>();
+ cityMap.put("1", "Beijing");
+ cityMap.put("2", "Shanghai");
+ Map<String, Object> ageMap = new HashMap<>();
+ ageMap.put("young", 18);
+ ageMap.put("middle", 35);
+
+ Map<String, Map<String, Object>> valueMapping = new HashMap<>();
+ valueMapping.put("city", cityMap);
+ valueMapping.put("age", ageMap);
+
+ edgeMapping.mappingValues(valueMapping);
+
+ Assert.assertEquals("Beijing", edgeMapping.mappingValue("city", "1"));
+ Assert.assertEquals("Shanghai", edgeMapping.mappingValue("city", "2"));
+ Assert.assertEquals(18, edgeMapping.mappingValue("age", "young"));
+ Assert.assertEquals(35, edgeMapping.mappingValue("age", "middle"));
+
+ // not exist raw value
+ Assert.assertEquals("Wuhan", edgeMapping.mappingValue("city", "Wuhan"));
+ Assert.assertEquals("old", edgeMapping.mappingValue("age", "old"));
+ }
+
+ @Test
+ public void testMappingValueIsNull() {
+ List<String> sourceFields = Arrays.asList("id", "name", "age");
+ List<String> targetFields = Arrays.asList("id", "ISBN", "price");
+ EdgeMapping edgeMapping = new EdgeMapping(sourceFields, targetFields);
+ edgeMapping.label("buy");
+
+ Map<String, Object> cityMap = new HashMap<>();
+ // The value in value_mapping can't be null
+ cityMap.put("1", null);
+ cityMap.put("2", "Shanghai");
+
+ Map<String, Map<String, Object>> valueMapping = new HashMap<>();
+ valueMapping.put("city", cityMap);
+ edgeMapping.mappingValues(valueMapping);
+
+ Assert.assertThrows(IllegalArgumentException.class, edgeMapping::check);
+ }
+}
diff --git a/hugegraph-spark-connector/src/test/java/org/apache/hugegraph/spark/connector/mapping/VertexMappingTest.java b/hugegraph-spark-connector/src/test/java/org/apache/hugegraph/spark/connector/mapping/VertexMappingTest.java
new file mode 100644
index 00000000..3f7b2d64
--- /dev/null
+++ b/hugegraph-spark-connector/src/test/java/org/apache/hugegraph/spark/connector/mapping/VertexMappingTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.spark.connector.mapping;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+
+import org.apache.hugegraph.testutil.Assert;
+import org.junit.Test;
+
+public class VertexMappingTest {
+
+ @Test
+ public void testBasicVertexMapping() {
+ VertexMapping vertexMapping = new VertexMapping("unique-id");
+ vertexMapping.label("person");
+
+ vertexMapping.check();
+
+ Assert.assertEquals("vertex", vertexMapping.type().dataType());
+ Assert.assertEquals("unique-id", vertexMapping.idField());
+ Assert.assertEquals("person", vertexMapping.label());
+ Assert.assertEquals("vertex-mapping(label=person)", vertexMapping.toString());
+ }
+
+ @Test
+ public void testAdvancedVertexMapping() {
+ VertexMapping vertexMapping = new VertexMapping("unique-id");
+ vertexMapping.label("person");
+ vertexMapping.batchSize(100);
+ Assert.assertEquals(100, vertexMapping.batchSize());
+
+ vertexMapping.selectedFields(new HashSet<>(Arrays.asList("id", "name", "lang", "price")));
+ vertexMapping.ignoredFields(new HashSet<>(Arrays.asList("ISBN", "age")));
+
+ String[] selectFields = vertexMapping.selectedFields().toArray(new String[0]);
+ Assert.assertArrayEquals(new String[]{"price", "name", "id", "lang"}, selectFields);
+ String[] ignoredFields = vertexMapping.ignoredFields().toArray(new String[0]);
+ Assert.assertArrayEquals(new String[]{"ISBN", "age"}, ignoredFields);
+
+ HashMap<String, String> fieldMapping = new HashMap<>();
+ fieldMapping.put("source_name", "name");
+ fieldMapping.put("target_name", "name");
+ vertexMapping.mappingFields(fieldMapping);
+ }
+
+ @Test
+ public void testMappingFieldValueNotExists() {
+ VertexMapping vertexMapping = new VertexMapping("unique-id");
+ vertexMapping.label("person");
+
+ HashMap<String, String> fieldMapping = new HashMap<>();
+ fieldMapping.put("source_name", "name");
+ vertexMapping.mappingFields(fieldMapping);
+
+ Assert.assertEquals("new_source_name", vertexMapping.mappingField("new_source_name"));
+ }
+
+ @Test
+ public void testMappingFieldValueIsNull() {
+ VertexMapping vertexMapping = new VertexMapping("unique-id");
+ vertexMapping.label("person");
+
+ HashMap<String, String> fieldMapping = new HashMap<>();
+ // The value in field_mapping can't be null
+ fieldMapping.put("source_name", null);
+ vertexMapping.mappingFields(fieldMapping);
+
+ Assert.assertThrows(IllegalArgumentException.class, vertexMapping::check);
+ }
+
+ @Test
+ public void testMappingValue() {
+ VertexMapping vertexMapping = new VertexMapping("unique-id");
+ vertexMapping.label("person");
+
+ Map<String, Object> cityMap = new HashMap<>();
+ cityMap.put("1", "Beijing");
+ cityMap.put("2", "Shanghai");
+ Map<String, Object> ageMap = new HashMap<>();
+ ageMap.put("young", 18);
+ ageMap.put("middle", 35);
+
+ Map<String, Map<String, Object>> valueMapping = new HashMap<>();
+ valueMapping.put("city", cityMap);
+ valueMapping.put("age", ageMap);
+
+ vertexMapping.mappingValues(valueMapping);
+
+ Assert.assertEquals("Beijing", vertexMapping.mappingValue("city", "1"));
+ Assert.assertEquals("Shanghai", vertexMapping.mappingValue("city", "2"));
+ Assert.assertEquals(18, vertexMapping.mappingValue("age", "young"));
+ Assert.assertEquals(35, vertexMapping.mappingValue("age", "middle"));
+
+ // not exist raw value
+ Assert.assertEquals("Wuhan", vertexMapping.mappingValue("city", "Wuhan"));
+ Assert.assertEquals("old", vertexMapping.mappingValue("age", "old"));
+ }
+
+ @Test
+ public void testMappingValueIsNull() {
+ VertexMapping vertexMapping = new VertexMapping("unique-id");
+ vertexMapping.label("person");
+
+ Map<String, Object> cityMap = new HashMap<>();
+ // The value in value_mapping can't be null
+ cityMap.put("1", null);
+ cityMap.put("2", "Shanghai");
+
+ Map<String, Map<String, Object>> valueMapping = new HashMap<>();
+ valueMapping.put("city", cityMap);
+ vertexMapping.mappingValues(valueMapping);
+
+ Assert.assertThrows(IllegalArgumentException.class, vertexMapping::check);
+ }
+}
diff --git a/hugegraph-spark-connector/src/test/java/org/apache/hugegraph/spark/connector/options/HGOptionsTest.java b/hugegraph-spark-connector/src/test/java/org/apache/hugegraph/spark/connector/options/HGOptionsTest.java
new file mode 100644
index 00000000..03c10499
--- /dev/null
+++ b/hugegraph-spark-connector/src/test/java/org/apache/hugegraph/spark/connector/options/HGOptionsTest.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.spark.connector.options;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hugegraph.testutil.Assert;
+import org.junit.Test;
+
+public class HGOptionsTest {
+
+ private static final int CPUS = Runtime.getRuntime().availableProcessors();
+
+ @Test
+ public void testOptionDefaultValue() {
+ Map<String, String> requiredConfigs = new HashMap<>();
+ requiredConfigs.put("data-type", "vertex");
+ requiredConfigs.put("label", "person");
+ requiredConfigs.put("id", "name");
+
+ HGOptions hgOptions = new HGOptions(requiredConfigs);
+ Assert.assertEquals("localhost", hgOptions.host());
+ Assert.assertEquals(8080, hgOptions.port());
+ Assert.assertEquals("hugegraph", hgOptions.graph());
+ Assert.assertEquals("http", hgOptions.protocol());
+ Assert.assertNull(hgOptions.username());
+ Assert.assertNull(hgOptions.token());
+ Assert.assertEquals(60, hgOptions.timeout());
+ Assert.assertEquals(CPUS * 4, hgOptions.maxConnection());
+ Assert.assertEquals(CPUS * 2, hgOptions.maxConnectionPerRoute());
+ Assert.assertNull(hgOptions.trustStoreFile());
+ Assert.assertNull(hgOptions.trustStoreToken());
+
+ Assert.assertEquals(500, hgOptions.batchSize());
+ Assert.assertEquals(",", hgOptions.delimiter());
+ Assert.assertEquals(0, hgOptions.selectedFields().size());
+ Assert.assertEquals(0, hgOptions.ignoredFields().size());
+ }
+
+ @Test
+ public void testMissingDataType() {
+ Map<String, String> configs = new HashMap<>();
+ configs.put("label", "person");
+ configs.put("id", "name");
+ testMissingRequiredOption(configs);
+ }
+
+ @Test
+ public void testMissingLabel() {
+ Map<String, String> configs = new HashMap<>();
+ configs.put("data-type", "vertex");
+ configs.put("id", "name");
+ testMissingRequiredOption(configs);
+ }
+
+ @Test
+ public void testEdgeMissingSourceNames() {
+ Map<String, String> configs = new HashMap<>();
+ configs.put("data-type", "edge");
+ configs.put("label", "actIn");
+ testMissingRequiredOption(configs);
+ }
+
+ @Test
+ public void testEdgeMissingTargetNames() {
+ Map<String, String> configs = new HashMap<>();
+ configs.put("data-type", "edge");
+ configs.put("label", "actIn");
+ configs.put("source-name", "person,book");
+ testMissingRequiredOption(configs);
+ }
+
+ private void testMissingRequiredOption(Map<String, String> configs) {
+ Assert.assertThrows(IllegalArgumentException.class, () -> {
+ new HGOptions(configs);
+ });
+ }
+
+ @Test
+ public void testSelectedFields() {
+ Map<String, String> configs = new HashMap<>();
+ configs.put("data-type", "vertex");
+ configs.put("label", "person");
+ configs.put("id", "name");
+ configs.put("selected-fields", "id,name,age");
+
+ HGOptions hgOptions = new HGOptions(configs);
+ Set<String> selectedFieldSet = hgOptions.selectedFields();
+ String[] selectFields = selectedFieldSet.toArray(new String[0]);
+
+ Assert.assertArrayEquals(new String[]{"name", "id", "age"}, selectFields);
+ }
+
+ @Test
+ public void testIgnoredFields() {
+ Map<String, String> configs = new HashMap<>();
+ configs.put("data-type", "vertex");
+ configs.put("label", "person");
+ configs.put("id", "name");
+ configs.put("ignored-fields", "id,name,age");
+
+ HGOptions hgOptions = new HGOptions(configs);
+ Set<String> ignoreFieldSet = hgOptions.ignoredFields();
+ String[] ignoreFields = ignoreFieldSet.toArray(new String[0]);
+
+ Assert.assertArrayEquals(new String[]{"name", "id", "age"}, ignoreFields);
+ }
+
+ @Test
+ public void testSelectedAndIgnoredFields() {
+ Map<String, String> configs = new HashMap<>();
+ configs.put("data-type", "vertex");
+ configs.put("label", "person");
+ configs.put("id", "name");
+ configs.put("selected-fields", "id,name,age");
+ configs.put("ignored-fields", "price,ISBN");
+
+ Assert.assertThrows(IllegalArgumentException.class, () -> {
+ new HGOptions(configs);
+ });
+ }
+
+ @Test
+ public void testEdgeSourceAndTargetNames() {
+ Map<String, String> configs = new HashMap<>();
+ configs.put("data-type", "edge");
+ configs.put("label", "actIn");
+ configs.put("source-name", "person,book");
+ configs.put("target-name", "movie,competition");
+
+ HGOptions hgOptions = new HGOptions(configs);
+
+ String[] sourceNames = hgOptions.sourceName().toArray(new String[0]);
+ Assert.assertArrayEquals(new String[]{"person", "book"}, sourceNames);
+
+ String[] targetNames = hgOptions.targetName().toArray(new String[0]);
+ Assert.assertArrayEquals(new String[]{"movie", "competition"}, targetNames);
+ }
+}
diff --git a/hugegraph-spark-connector/src/test/java/org/apache/hugegraph/spark/connector/utils/DataTypeUtilsTest.java b/hugegraph-spark-connector/src/test/java/org/apache/hugegraph/spark/connector/utils/DataTypeUtilsTest.java
new file mode 100644
index 00000000..dbca930e
--- /dev/null
+++ b/hugegraph-spark-connector/src/test/java/org/apache/hugegraph/spark/connector/utils/DataTypeUtilsTest.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.spark.connector.utils;
+
+import java.util.Date;
+import java.util.UUID;
+
+import org.apache.hugegraph.structure.constant.Cardinality;
+import org.apache.hugegraph.structure.constant.DataType;
+import org.apache.hugegraph.structure.schema.PropertyKey;
+import org.apache.hugegraph.testutil.Assert;
+import org.junit.Test;
+
+public class DataTypeUtilsTest {
+
+ @Test
+ public void testParseNumber() {
+ Assert.assertEquals(12345,
+ DataTypeUtils.parseNumber("valid-int-value", 12345L));
+ Assert.assertEquals(123456789987654L,
+ DataTypeUtils.parseNumber("valid-long-value", 123456789987654L));
+ Assert.assertEquals(123456789987654L,
+ DataTypeUtils.parseNumber("valid-long-value", "123456789987654"));
+ Assert.assertThrows(IllegalArgumentException.class, () -> {
+ DataTypeUtils.parseNumber("in-valid-long-value", "abc123456789");
+ });
+ }
+
+ @Test
+ public void testParseUUID() {
+ UUID uuid = UUID.randomUUID();
+ Assert.assertEquals(uuid, DataTypeUtils.parseUUID("uuid", uuid));
+ Assert.assertEquals(uuid, DataTypeUtils.parseUUID("string-uuid", uuid.toString()));
+ Assert.assertThrows(IllegalArgumentException.class, () -> {
+ DataTypeUtils.parseUUID("invalid-uuid", "hugegraph");
+ });
+ }
+
+ @Test
+ public void convertSingleString() {
+ PropertyKey propertyKey = new PropertyKey("name");
+ Assert.assertEquals("josh", DataTypeUtils.convert("josh", propertyKey));
+ }
+
+ @Test
+ public void convertSingleNumber() {
+ PropertyKey.Builder builder = new PropertyKey.BuilderImpl("number", null);
+ PropertyKey bytePropertyKey = builder.dataType(DataType.BYTE)
+ .cardinality(Cardinality.SINGLE)
+ .build();
+ Assert.assertEquals((byte) 25, DataTypeUtils.convert("25", bytePropertyKey));
+ Assert.assertEquals((byte) 25, DataTypeUtils.convert(25, bytePropertyKey));
+
+ PropertyKey intPropertyKey = builder.dataType(DataType.INT)
+ .cardinality(Cardinality.SINGLE)
+ .build();
+ Assert.assertEquals(20230920, DataTypeUtils.convert("20230920", intPropertyKey));
+ Assert.assertEquals(20230920, DataTypeUtils.convert(20230920, intPropertyKey));
+
+ PropertyKey longPropertyKey = builder.dataType(DataType.LONG)
+ .cardinality(Cardinality.SINGLE)
+ .build();
+ Assert.assertEquals(202309205435343242L, DataTypeUtils.convert("202309205435343242",
+ longPropertyKey));
+ Assert.assertEquals(202309205435343242L, DataTypeUtils.convert(202309205435343242L,
+ longPropertyKey));
+
+ PropertyKey floatPropertyKey = builder.dataType(DataType.FLOAT)
+ .cardinality(Cardinality.SINGLE)
+ .build();
+ Assert.assertEquals(123.45f, DataTypeUtils.convert("123.45", floatPropertyKey));
+ Assert.assertEquals(123.45f, DataTypeUtils.convert(123.45, floatPropertyKey));
+
+ PropertyKey doublePropertyKey = builder.dataType(DataType.DOUBLE)
+ .cardinality(Cardinality.SINGLE)
+ .build();
+ Assert.assertEquals(1235443.48956783, DataTypeUtils.convert("1235443.48956783",
+ doublePropertyKey));
+ Assert.assertEquals(1235443.48956783, DataTypeUtils.convert(1235443.48956783, doublePropertyKey));
+ }
+
+ @Test
+ public void testConvertSingleBoolean() {
+ PropertyKey.Builder builder = new PropertyKey.BuilderImpl("boolean", null);
+ PropertyKey booleanPropertyKey = builder.dataType(DataType.BOOLEAN)
+ .cardinality(Cardinality.SINGLE)
+ .build();
+
+ Assert.assertEquals(true, DataTypeUtils.convert(true, booleanPropertyKey));
+ Assert.assertEquals(true, DataTypeUtils.convert("true", booleanPropertyKey));
+ Assert.assertEquals(true, DataTypeUtils.convert("True", booleanPropertyKey));
+ Assert.assertEquals(true, DataTypeUtils.convert("TRUE", booleanPropertyKey));
+ Assert.assertEquals(true, DataTypeUtils.convert("yes", booleanPropertyKey));
+ Assert.assertEquals(true, DataTypeUtils.convert("y", booleanPropertyKey));
+ Assert.assertEquals(true, DataTypeUtils.convert("1", booleanPropertyKey));
+
+ Assert.assertEquals(false, DataTypeUtils.convert(false, booleanPropertyKey));
+ Assert.assertEquals(false, DataTypeUtils.convert("false", booleanPropertyKey));
+ Assert.assertEquals(false, DataTypeUtils.convert("False", booleanPropertyKey));
+ Assert.assertEquals(false, DataTypeUtils.convert("FALSE", booleanPropertyKey));
+ Assert.assertEquals(false, DataTypeUtils.convert("no", booleanPropertyKey));
+ Assert.assertEquals(false, DataTypeUtils.convert("n", booleanPropertyKey));
+ Assert.assertEquals(false, DataTypeUtils.convert("0", booleanPropertyKey));
+
+ Assert.assertThrows(IllegalArgumentException.class, () -> {
+ DataTypeUtils.convert("right", booleanPropertyKey);
+ });
+
+ Assert.assertThrows(IllegalArgumentException.class, () -> {
+ DataTypeUtils.convert(123, booleanPropertyKey);
+ });
+ }
+
+ @Test
+ public void testConvertSingleUUID() {
+ PropertyKey.Builder builder = new PropertyKey.BuilderImpl("uuid", null);
+ PropertyKey uuidPropertyKey = builder.dataType(DataType.UUID)
+ .cardinality(Cardinality.SINGLE)
+ .build();
+ UUID uuid = UUID.randomUUID();
+ Assert.assertEquals(uuid, DataTypeUtils.convert(uuid, uuidPropertyKey));
+ Assert.assertEquals(uuid, DataTypeUtils.convert(uuid.toString(), uuidPropertyKey));
+ Assert.assertThrows(IllegalArgumentException.class, () -> {
+ DataTypeUtils.convert("hugegraph", uuidPropertyKey);
+ });
+ }
+
+ @Test
+ public void testConvertSingleDate() {
+ PropertyKey.Builder builder = new PropertyKey.BuilderImpl("date", null);
+ PropertyKey datePropertyKey = builder.dataType(DataType.DATE)
+ .cardinality(Cardinality.SINGLE)
+ .build();
+
+ Date date = new Date(1695233374320L);
+ Assert.assertEquals(date, DataTypeUtils.convert(date, datePropertyKey));
+ Assert.assertEquals(date, DataTypeUtils.convert(1695233374320L, datePropertyKey));
+ // TODO check
+ // Assert.assertEquals(date, DataTypeUtils.convert("2023-09-21 02:09:34", datePropertyKey));
+ Assert.assertThrows(IllegalArgumentException.class, () -> {
+ DataTypeUtils.convert(date.toString(), datePropertyKey);
+ });
+ Assert.assertThrows(IllegalArgumentException.class, () -> {
+ DataTypeUtils.convert("abc", datePropertyKey);
+ });
+ }
+}
diff --git a/hugegraph-spark-connector/src/test/java/org/apache/hugegraph/spark/connector/utils/HGEnvUtils.java b/hugegraph-spark-connector/src/test/java/org/apache/hugegraph/spark/connector/utils/HGEnvUtils.java
new file mode 100644
index 00000000..28f112d4
--- /dev/null
+++ b/hugegraph-spark-connector/src/test/java/org/apache/hugegraph/spark/connector/utils/HGEnvUtils.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.spark.connector.utils;
+
+import org.apache.hugegraph.driver.HugeClient;
+import org.apache.hugegraph.driver.SchemaManager;
+
+public class HGEnvUtils {
+
+ public static final String DEFAULT_HOST = "127.0.0.1";
+ public static final String DEFAULT_PORT = "8080";
+ public static final String DEFAULT_GRAPH = "hugegraph";
+ public static final String DEFAULT_URL = "http://" + DEFAULT_HOST + ":" + DEFAULT_PORT;
+
+ private static HugeClient hugeClient;
+
+ public static void createEnv() {
+
+ hugeClient = HugeClient.builder(DEFAULT_URL, DEFAULT_GRAPH).build();
+
+ hugeClient.graphs().clearGraph(DEFAULT_GRAPH, "I'm sure to delete all data");
+
+ SchemaManager schema = hugeClient.schema();
+
+ // Define schema
+ schema.propertyKey("name").asText().ifNotExist().create();
+ schema.propertyKey("age").asInt().ifNotExist().create();
+ schema.propertyKey("city").asText().ifNotExist().create();
+ schema.propertyKey("weight").asDouble().ifNotExist().create();
+ schema.propertyKey("lang").asText().ifNotExist().create();
+ schema.propertyKey("date").asText().ifNotExist().create();
+ schema.propertyKey("price").asDouble().ifNotExist().create();
+
+ schema.vertexLabel("person")
+ .properties("name", "age", "city")
+ .useCustomizeStringId()
+ .nullableKeys("age", "city")
+ .ifNotExist()
+ .create();
+
+ schema.vertexLabel("software")
+ .properties("name", "lang", "price")
+ .primaryKeys("name")
+ .ifNotExist()
+ .create();
+
+ schema.edgeLabel("knows")
+ .sourceLabel("person")
+ .targetLabel("person")
+ .properties("date", "weight")
+ .ifNotExist()
+ .create();
+
+ schema.edgeLabel("created")
+ .sourceLabel("person")
+ .targetLabel("software")
+ .properties("date", "weight")
+ .ifNotExist()
+ .create();
+ }
+
+ public static void destroyEnv() {
+ hugeClient.close();
+ }
+
+ public static HugeClient getHugeClient() {
+ return hugeClient;
+ }
+}
diff --git a/hugegraph-spark-connector/src/test/java/org/apache/hugegraph/spark/connector/utils/HGUtilsTest.java b/hugegraph-spark-connector/src/test/java/org/apache/hugegraph/spark/connector/utils/HGUtilsTest.java
new file mode 100644
index 00000000..b25eb15f
--- /dev/null
+++ b/hugegraph-spark-connector/src/test/java/org/apache/hugegraph/spark/connector/utils/HGUtilsTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.spark.connector.utils;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hugegraph.spark.connector.mapping.EdgeMapping;
+import org.apache.hugegraph.spark.connector.mapping.VertexMapping;
+import org.apache.hugegraph.spark.connector.options.HGOptions;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class HGUtilsTest {
+
+ @Test
+ public void testVertexMappingFromConf() {
+ Map<String, String> configs = new HashMap<>();
+ configs.put("data-type", "vertex");
+ configs.put("label", "person");
+ configs.put("id", "name");
+ configs.put("batch-size", "100");
+ configs.put("ignored-fields", "price,ISBN");
+
+ HGOptions options = new HGOptions(configs);
+ VertexMapping vertexMapping = HGUtils.vertexMappingFromConf(options);
+
+ Assert.assertEquals("vertex", vertexMapping.type().dataType());
+ Assert.assertEquals("person", vertexMapping.label());
+ Assert.assertEquals("name", vertexMapping.idField());
+ Assert.assertEquals(100, vertexMapping.batchSize());
+ Assert.assertArrayEquals(new String[]{"ISBN", "price"},
+ vertexMapping.ignoredFields().toArray());
+ }
+
+ @Test
+ public void testEdgeMappingFromConf() {
+ Map<String, String> configs = new HashMap<>();
+ configs.put("data-type", "edge");
+ configs.put("label", "buy");
+ configs.put("source-name", "id,name,age");
+ configs.put("target-name", "price,ISBN");
+ configs.put("batch-size", "200");
+ configs.put("selected-fields", "price,pages");
+
+ HGOptions options = new HGOptions(configs);
+ EdgeMapping edgeMapping = HGUtils.edgeMappingFromConf(options);
+
+ Assert.assertEquals("edge", edgeMapping.type().dataType());
+ Assert.assertEquals("buy", edgeMapping.label());
+ Assert.assertEquals(200, edgeMapping.batchSize());
+
+ Assert.assertArrayEquals(new String[]{"id", "name", "age"},
+ edgeMapping.sourceFields().toArray(new String[0]));
+ Assert.assertArrayEquals(new String[]{"price", "ISBN"},
+ edgeMapping.targetFields().toArray(new String[0]));
+ Assert.assertArrayEquals(new String[]{"pages", "price"},
+ edgeMapping.selectedFields().toArray(new String[0]));
+ }
+}
diff --git a/hugegraph-spark-connector/src/test/scala/org/apache/hugegraph/spark/connector/SinkExampleTest.scala b/hugegraph-spark-connector/src/test/scala/org/apache/hugegraph/spark/connector/SinkExampleTest.scala
new file mode 100644
index 00000000..2219c3b1
--- /dev/null
+++ b/hugegraph-spark-connector/src/test/scala/org/apache/hugegraph/spark/connector/SinkExampleTest.scala
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.spark.connector
+
+import org.apache.hugegraph.driver.HugeClient
+import org.apache.hugegraph.spark.connector.utils.HGEnvUtils
+import org.apache.hugegraph.structure.graph.Vertex
+import org.apache.spark.sql.{SaveMode, SparkSession}
+import org.junit.Assert.{assertEquals, assertTrue}
+import org.junit.runners.MethodSorters
+import org.junit.{AfterClass, BeforeClass, FixMethodOrder, Test}
+
+import java.util
+import scala.collection.JavaConverters.iterableAsScalaIterableConverter
+
+object SinkExampleTest {
+
+ var client: HugeClient = _
+
+ val sparkSession: SparkSession = SparkSession.builder()
+ .master("local[*]")
+ .appName(this.getClass.getSimpleName)
+ .getOrCreate()
+
+ @BeforeClass
+ def beforeClass(): Unit = {
+ HGEnvUtils.createEnv()
+ client = HGEnvUtils.getHugeClient
+ }
+
+ @AfterClass
+ def afterClass(): Unit = {
+ HGEnvUtils.destroyEnv()
+ sparkSession.stop()
+ }
+}
+
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
+class SinkExampleTest {
+
+ val client: HugeClient = SinkExampleTest.client
+ val sparkSession: SparkSession = SinkExampleTest.sparkSession
+
+ val DEFAULT_ENTRANCE: String = "org.apache.hugegraph.spark.connector.DataSource"
+ val DEFAULT_HOST: String = HGEnvUtils.DEFAULT_HOST
+ val DEFAULT_PORT: String = HGEnvUtils.DEFAULT_PORT
+ val DEFAULT_GRAPH: String = HGEnvUtils.DEFAULT_GRAPH
+
+ @Test
+ def testFirstInsertVertexPerson(): Unit = {
+ val df = sparkSession.createDataFrame(Seq(
+ Tuple3("marko", 29, "Beijing"),
+ Tuple3("vadas", 27, "HongKong"),
+ Tuple3("Josh", 32, "Beijing"),
+ Tuple3("peter", 35, "ShangHai"),
+ Tuple3("li,nary", 26, "Wu,han"),
+ Tuple3("Bob", 18, "HangZhou"),
+ )).toDF("name", "age", "city")
+
+ df.show()
+
+ df.write
+ .format(DEFAULT_ENTRANCE)
+ .option("host", DEFAULT_HOST)
+ .option("port", DEFAULT_PORT)
+ .option("graph", DEFAULT_GRAPH)
+ .option("data-type", "vertex")
+ .option("label", "person")
+ .option("id", "name")
+ .option("batch-size", 2)
+ .mode(SaveMode.Overwrite)
+ .save()
+
+ val vertices: util.List[Vertex] = client.graph().listVertices("person")
+ assertEquals(6, vertices.size())
+ }
+
+ @Test
+ def testFirstInsertVertexSoftware(): Unit = {
+ val df = sparkSession.createDataFrame(Seq(
+ Tuple4("lop", "java", 328L, "ISBN978-7-107-18618-5"),
+ Tuple4("ripple", "python", 199L, "ISBN978-7-100-13678-5"),
+ )).toDF("name", "lang", "price", "ISBN")
+
+ df.show()
+
+ df.write
+ .format(DEFAULT_ENTRANCE)
+ .option("host", DEFAULT_HOST)
+ .option("port", DEFAULT_PORT)
+ .option("graph", DEFAULT_GRAPH)
+ .option("data-type", "vertex")
+ .option("label", "software")
+ .option("ignored-fields", "ISBN")
+ .option("batch-size", 2)
+ .mode(SaveMode.Overwrite)
+ .save()
+
+ val vertices = client.graph().listVertices("software").asScala.toList
+ assertEquals(2, vertices.size)
+
+ for (vertex <- vertices) {
+ val properties = vertex.properties()
+ assertTrue(!properties.containsKey("ISBN"))
+ }
+ }
+
+ @Test
+ def testSecondInsertEdgeKnows(): Unit = {
+ val df = sparkSession.createDataFrame(Seq(
+ Tuple4("marko", "vadas", "20160110", 0.5),
+ Tuple4("peter", "Josh", "20230801", 1.0),
+ Tuple4("peter", "li,nary", "20130220", 2.0)
+ )).toDF("source", "target", "date", "weight")
+
+ df.show()
+
+ df.write
+ .format(DEFAULT_ENTRANCE)
+ .option("host", DEFAULT_HOST)
+ .option("port", DEFAULT_PORT)
+ .option("graph", DEFAULT_GRAPH)
+ .option("data-type", "edge")
+ .option("label", "knows")
+ .option("source-name", "source")
+ .option("target-name", "target")
+ .option("batch-size", 2)
+ .mode(SaveMode.Overwrite)
+ .save()
+
+ val edges = client.graph().listEdges("knows")
+ assertEquals(3, edges.size())
+ }
+
+ @Test
+ def testSecondInsertEdgeCreated(): Unit = {
+ val df = sparkSession.createDataFrame(Seq(
+ Tuple4("marko", "lop", "20171210", 0.5),
+ Tuple4("Josh", "lop", "20091111", 0.4),
+ Tuple4("peter", "ripple", "20171210", 1.0),
+ Tuple4("vadas", "lop", "20171210", 0.2)
+ )).toDF("source", "name", "date", "weight")
+
+ df.show()
+
+ df.write
+ .format(DEFAULT_ENTRANCE)
+ .option("host", DEFAULT_HOST)
+ .option("port", DEFAULT_PORT)
+ .option("graph", DEFAULT_GRAPH)
+ .option("data-type", "edge")
+ .option("label", "created")
+ .option("source-name", "source") // customize
+ .option("target-name", "name") // pk
+ .option("batch-size", 2)
+ .mode(SaveMode.Overwrite)
+ .save()
+
+ val edges = client.graph().listEdges("created")
+ assertEquals(4, edges.size())
+ }
+}
diff --git a/pom.xml b/pom.xml
index b11b9e7f..7eb4cb50 100644
--- a/pom.xml
+++ b/pom.xml
@@ -90,6 +90,7 @@
<modules>
<module>hugegraph-client</module>
<module>hugegraph-loader</module>
+ <module>hugegraph-spark-connector</module>
<module>hugegraph-tools</module>
<module>hugegraph-hubble</module>
<module>hugegraph-dist</module>