You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hugegraph.apache.org by ji...@apache.org on 2023/10/16 08:32:53 UTC

[incubator-hugegraph-toolchain] branch master updated: feat(spark): support spark-sink connector for loader (#497)

This is an automated email from the ASF dual-hosted git repository.

jin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hugegraph-toolchain.git


The following commit(s) were added to refs/heads/master by this push:
     new 71e623d3 feat(spark): support spark-sink connector for loader (#497)
71e623d3 is described below

commit 71e623d369db00a8f1f82e2ab3f5170daee6dbfa
Author: Liu Xiao <42...@users.noreply.github.com>
AuthorDate: Mon Oct 16 16:32:47 2023 +0800

    feat(spark): support spark-sink connector for loader (#497)
    
    done for V1
    
    ---------
    
    Co-authored-by: imbajin <ji...@apache.org>
---
 .github/workflows/spark-connector-ci.yml           |  57 +++
 .../scripts/dependency/known-dependencies.txt      |  19 +-
 hugegraph-spark-connector/README.md                | 193 ++++++++++
 .../assembly/travis/install-hugegraph-from-tar.sh  |  35 ++
 hugegraph-spark-connector/pom.xml                  | 257 +++++++++++++
 .../spark/connector/builder/EdgeBuilder.java       | 200 ++++++++++
 .../spark/connector/builder/ElementBuilder.java    | 403 +++++++++++++++++++++
 .../spark/connector/builder/VertexBuilder.java     |  83 +++++
 .../spark/connector/client/HGClientHolder.java     | 111 ++++++
 .../spark/connector/client/HGLoadContext.java      |  94 +++++
 .../spark/connector/client/SchemaCache.java        | 113 ++++++
 .../spark/connector/constant/Constants.java        |  71 ++++
 .../spark/connector/constant/DataTypeEnum.java     |  47 +++
 .../spark/connector/exception/LoadException.java   |  39 ++
 .../spark/connector/mapping/EdgeMapping.java       |  66 ++++
 .../spark/connector/mapping/ElementMapping.java    | 143 ++++++++
 .../spark/connector/mapping/VertexMapping.java     |  51 +++
 .../spark/connector/options/HGOptions.java         | 266 ++++++++++++++
 .../spark/connector/utils/DataTypeUtils.java       | 215 +++++++++++
 .../hugegraph/spark/connector/utils/DateUtils.java |  48 +++
 .../hugegraph/spark/connector/utils/HGUtils.java   |  64 ++++
 .../src/main/resources/log4j2.xml                  |  72 ++++
 .../hugegraph/spark/connector/DataSource.scala     |  55 +++
 .../apache/hugegraph/spark/connector/HGTable.scala |  54 +++
 .../spark/connector/utils/HGBuildUtils.scala       |  78 ++++
 .../spark/connector/writer/HGBatchWriter.scala     |  43 +++
 .../connector/writer/HGBatchWriterFactory.scala    |  41 +++
 .../spark/connector/writer/HGCommitMessage.scala   |  22 ++
 .../spark/connector/writer/HGEdgeWriter.scala      |  85 +++++
 .../spark/connector/writer/HGVertexWriter.scala    |  86 +++++
 .../spark/connector/writer/HGWriterBuilder.scala   |  38 ++
 .../spark/connector/builder/EdgeBuilderTest.java   | 105 ++++++
 .../spark/connector/builder/VertexBuilderTest.java | 177 +++++++++
 .../spark/connector/client/HGClientHolderTest.java |  45 +++
 .../spark/connector/mapping/EdgeMappingTest.java   | 184 ++++++++++
 .../spark/connector/mapping/VertexMappingTest.java | 133 +++++++
 .../spark/connector/options/HGOptionsTest.java     | 156 ++++++++
 .../spark/connector/utils/DataTypeUtilsTest.java   | 162 +++++++++
 .../spark/connector/utils/HGEnvUtils.java          |  84 +++++
 .../spark/connector/utils/HGUtilsTest.java         |  75 ++++
 .../spark/connector/SinkExampleTest.scala          | 177 +++++++++
 pom.xml                                            |   1 +
 42 files changed, 4439 insertions(+), 9 deletions(-)

diff --git a/.github/workflows/spark-connector-ci.yml b/.github/workflows/spark-connector-ci.yml
new file mode 100644
index 00000000..efa4639b
--- /dev/null
+++ b/.github/workflows/spark-connector-ci.yml
@@ -0,0 +1,57 @@
+name: "hugegraph-spark-connector-ci"
+
+on:
+  push:
+    branches:
+      - master
+      - /^release-.*$/
+    paths:
+      - hugegraph-spark-connector/**
+      - hugegraph-dist/**
+      - .github/workflows/**
+      - pom.xml
+  pull_request:
+    paths:
+      - hugegraph-spark-connector/**
+      - hugegraph-dist/**
+      - hugegraph-client/**
+      - .github/workflows/**
+      - pom.xml
+
+jobs:
+  spark-connector-ci:
+    runs-on: ubuntu-latest
+    env:
+      TRAVIS_DIR: hugegraph-spark-connector/assembly/travis
+      VERSION_ID: 1.0.0
+    steps:
+      - name: Install JDK 11
+        uses: actions/setup-java@v3
+        with:
+          java-version: '11'
+          distribution: 'adopt'
+
+      - name: Cache Maven packages
+        uses: actions/cache@v3
+        with:
+          path: ~/.m2
+          key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }}
+          restore-keys: ${{ runner.os }}-m2
+
+      - name: Checkout
+        uses: actions/checkout@v4
+        with:
+          fetch-depth: 2
+
+      - name: Compile
+        run: |
+          mvn install -pl hugegraph-spark-connector -Dmaven.javadoc.skip=true -DskipTests -ntp
+
+      - name: Prepare env and service
+        run: |
+          $TRAVIS_DIR/install-hugegraph-from-tar.sh $VERSION_ID
+
+      - name: Run test
+        run: |
+          cd hugegraph-spark-connector && ls
+          mvn test
diff --git a/hugegraph-dist/scripts/dependency/known-dependencies.txt b/hugegraph-dist/scripts/dependency/known-dependencies.txt
index 6a67bf29..bacffaed 100644
--- a/hugegraph-dist/scripts/dependency/known-dependencies.txt
+++ b/hugegraph-dist/scripts/dependency/known-dependencies.txt
@@ -1,7 +1,3 @@
-HdrHistogram-2.1.9.jar
-HikariCP-3.2.0.jar
-LatencyUtils-2.0.3.jar
-ST4-4.0.4.jar
 accessors-smart-1.2.jar
 accessors-smart-2.4.2.jar
 aircompressor-0.10.jar
@@ -79,8 +75,8 @@ hadoop-yarn-client-3.3.1.jar
 hadoop-yarn-common-3.3.1.jar
 hbase-client-2.2.3.jar
 hbase-common-2.2.3.jar
-hbase-hadoop-compat-2.2.3.jar
 hbase-hadoop2-compat-2.2.3.jar
+hbase-hadoop-compat-2.2.3.jar
 hbase-http-2.2.3.jar
 hbase-mapreduce-2.2.3.jar
 hbase-metrics-2.2.3.jar
@@ -95,7 +91,9 @@ hbase-shaded-miscellaneous-2.2.1.jar
 hbase-shaded-netty-2.2.1.jar
 hbase-shaded-protobuf-2.2.1.jar
 hbase-zookeeper-2.2.3.jar
+HdrHistogram-2.1.9.jar
 hibernate-validator-6.0.17.Final.jar
+HikariCP-3.2.0.jar
 hive-classification-3.1.3.jar
 hive-common-3.1.3.jar
 hive-exec-3.1.3.jar
@@ -146,16 +144,17 @@ jakarta.xml.bind-api-2.3.2.jar
 jakarta.xml.bind-api-4.0.0-RC2.jar
 jamon-runtime-2.4.1.jar
 javassist-3.24.0-GA.jar
+javassist-3.25.0-GA.jar
 javassist-3.28.0-GA.jar
 javax.activation-api-1.2.0.jar
 javax.annotation-api-1.3.2.jar
 javax.el-3.0.0.jar
 javax.el-3.0.1-b12.jar
 javax.json-1.0.jar
-javax.servlet-api-3.1.0.jar
-javax.servlet-api-4.0.1.jar
 javax.servlet.jsp-2.3.2.jar
 javax.servlet.jsp-api-2.3.1.jar
+javax.servlet-api-3.1.0.jar
+javax.servlet-api-4.0.1.jar
 javolution-5.5.1.jar
 jaxb-api-2.2.11.jar
 jaxb-api-2.3.1.jar
@@ -227,13 +226,14 @@ kerby-config-1.0.1.jar
 kerby-pkix-1.0.1.jar
 kerby-util-1.0.1.jar
 kerby-xdr-1.0.1.jar
+LatencyUtils-2.0.3.jar
 leveldbjni-all-1.8.jar
 libthrift-0.9.3.jar
 lightning-csv-8.2.1.jar
 listenablefuture-9999.0-empty-to-avoid-conflict-with-guava.jar
+log4j-1.2.17.jar
 log4j-1.2-api-2.11.2.jar
 log4j-1.2-api-2.17.1.jar
-log4j-1.2.17.jar
 log4j-api-2.18.0.jar
 log4j-core-2.18.0.jar
 log4j-jul-2.11.2.jar
@@ -316,8 +316,9 @@ spring-jdbc-5.1.9.RELEASE.jar
 spring-tx-5.1.9.RELEASE.jar
 spring-web-5.1.9.RELEASE.jar
 spring-webmvc-5.1.9.RELEASE.jar
-stax-api-1.0.1.jar
+ST4-4.0.4.jar
 stax2-api-4.2.1.jar
+stax-api-1.0.1.jar
 threeten-extra-1.5.0.jar
 token-provider-1.0.1.jar
 tomcat-embed-core-9.0.24.jar
diff --git a/hugegraph-spark-connector/README.md b/hugegraph-spark-connector/README.md
new file mode 100644
index 00000000..4cfc3b2c
--- /dev/null
+++ b/hugegraph-spark-connector/README.md
@@ -0,0 +1,193 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with this
+work for additional information regarding copyright ownership. The ASF
+licenses this file to You under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+License for the specific language governing permissions and limitations
+under the License.
+-->
+
+# HugeGraph Spark Connector
+
+[![License](https://img.shields.io/badge/license-Apache%202-0E78BA.svg)](https://www.apache.org/licenses/LICENSE-2.0.html)
+
+HugeGraph Spark Connector is a Spark connector application for reading and writing HugeGraph data in Spark standard format.
+
+## Building
+
+Required:
+
+- Java 8+
+- Maven 3.6+
+
+To build without executing tests:
+
+```bash
+mvn clean package -DskipTests
+```
+
+To build with default tests:
+
+```bash
+mvn clean packge
+```
+
+## How to use
+
+If we have a graph, the schema is defined as follows:
+
+### Schema
+
+```groovy
+schema.propertyKey("name").asText().ifNotExist().create()
+schema.propertyKey("age").asInt().ifNotExist().create()
+schema.propertyKey("city").asText().ifNotExist().create()
+schema.propertyKey("weight").asDouble().ifNotExist().create()
+schema.propertyKey("lang").asText().ifNotExist().create()
+schema.propertyKey("date").asText().ifNotExist().create()
+schema.propertyKey("price").asDouble().ifNotExist().create()
+
+schema.vertexLabel("person")
+        .properties("name", "age", "city")
+        .useCustomizeStringId()
+        .nullableKeys("age", "city")
+        .ifNotExist()
+        .create()
+
+schema.vertexLabel("software")
+        .properties("name", "lang", "price")
+        .primaryKeys("name")
+        .ifNotExist()
+        .create()
+
+schema.edgeLabel("knows")
+        .sourceLabel("person")
+        .targetLabel("person")
+        .properties("date", "weight")
+        .ifNotExist()
+        .create()
+
+schema.edgeLabel("created")
+        .sourceLabel("person")
+        .targetLabel("software")
+        .properties("date", "weight")
+        .ifNotExist()
+        .create()
+```
+
+Then we can insert graph data through Spark, first add dependency in your pom.
+
+```xml
+<dependency>
+    <groupId>org.apache.hugegraph</groupId>
+    <artifactId>hugegraph-spark-connector</artifactId>
+    <version>${revision}</version>
+</dependency>
+```
+
+### Vertex Sink
+
+```scala
+val df = sparkSession.createDataFrame(Seq(
+  Tuple3("marko", 29, "Beijing"),
+  Tuple3("vadas", 27, "HongKong"),
+  Tuple3("Josh", 32, "Beijing"),
+  Tuple3("peter", 35, "ShangHai"),
+  Tuple3("li,nary", 26, "Wu,han"),
+  Tuple3("Bob", 18, "HangZhou"),
+)) toDF("name", "age", "city")
+
+df.show()
+
+df.write
+  .format("org.apache.hugegraph.spark.connector.DataSource")
+  .option("host", "127.0.0.1")
+  .option("port", "8080")
+  .option("graph", "hugegraph")
+  .option("data-type", "vertex")
+  .option("label", "person")
+  .option("id", "name")
+  .option("batch-size", 2)
+  .mode(SaveMode.Overwrite)
+  .save()
+```
+
+### Edge Sink
+
+```scala
+val df = sparkSession.createDataFrame(Seq(
+  Tuple4("marko", "vadas", "20160110", 0.5),
+  Tuple4("peter", "Josh", "20230801", 1.0),
+  Tuple4("peter", "li,nary", "20130220", 2.0)
+)).toDF("source", "target", "date", "weight")
+
+df.show()
+
+df.write
+  .format("org.apache.hugegraph.spark.connector.DataSource")
+  .option("host", "127.0.0.1")
+  .option("port", "8080")
+  .option("graph", "hugegraph")
+  .option("data-type", "edge")
+  .option("label", "knows")
+  .option("source-name", "source")
+  .option("target-name", "target")
+  .option("batch-size", 2)
+  .mode(SaveMode.Overwrite)
+  .save()
+```
+
+### Configs
+
+Client Configs are used to configure hugegraph-client.
+
+#### Client Configs
+
+| Params               | Default Value | Description                                                                                  |
+|----------------------|---------------|----------------------------------------------------------------------------------------------|
+| `host`               | `localhost`   | Address of HugeGraphServer                                                                   |
+| `port`               | `8080`        | Port of HugeGraphServer                                                                      |
+| `graph`              | `hugegraph`   | Graph space name                                                                             |
+| `protocol`           | `http`        | Protocol for sending requests to the server, optional `http` or `https`                      |
+| `username`           | `null`        | Username of the current graph when HugeGraphServer enables permission authentication         |
+| `token`              | `null`        | Token of the current graph when HugeGraphServer has enabled authorization authentication     |
+| `timeout`            | `60`          | Timeout (seconds) for inserting results to return                                            |
+| `max-conn`           | `CPUS * 4`    | The maximum number of HTTP connections between HugeClient and HugeGraphServer                |
+| `max-conn-per-route` | `CPUS * 2`    | The maximum number of HTTP connections for each route between HugeClient and HugeGraphServer |
+| `trust-store-file`   | `null`        | The client’s certificate file path when the request protocol is https                        |
+| `trust-store-token`  | `null`        | The client's certificate password when the request protocol is https                         |
+
+##### Graph Data Configs
+
+Graph Data Configs are used to set graph space configuration.
+
+| Params            | Default Value | Description                                                                                                                                                                                                                                                                                                                                                                                                                |
+|-------------------|---------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| `date-type`       |               | Graph data type, must be `vertex` or `edge`                                                                                                                                                                                                                                                                                                                                                                                |
+| `label`           |               | Label to which the vertex/edge data to be imported belongs                                                                                                                                                                                                                                                                                                                                                                 |
+| `id`              |               | Specify a column as the id column of the vertex. When the vertex id policy is CUSTOMIZE, it is required; when the id policy is PRIMARY_KEY, it must be empty                                                                                                                                                                                                                                                               |
+| `source-name`     |               | Select certain columns of the input source as the id column of source vertex. When the id policy of the source vertex is CUSTOMIZE, a certain column must be specified as the id column of the vertex; when the id policy of the source vertex is When PRIMARY_KEY, one or more columns must be specified for splicing the id of the generated vertex, that is, no matter which id strategy is used, this item is required |
+| `target-name`     |               | Specify certain columns as the id columns of target vertex, similar to source                                                                                                                                                                                                                                                                                                                                              |
+| `selected-fields` |               | Select some columns to insert, other unselected ones are not inserted, cannot exist at the same time as ignored                                                                                                                                                                                                                                                                                                            |
+| `ignored-fields`  |               | Ignore some columns so that they do not participate in insertion, cannot exist at the same time as selected                                                                                                                                                                                                                                                                                                                |
+| `batch-size`      | `500`         | The number of data items in each batch when importing data                                                                                                                                                                                                                                                                                                                                                                 |
+
+#### Common Configs
+
+Common Configs contains some common configurations.
+
+| Params      | Default Value | Description                                                                     |
+|-------------|---------------|---------------------------------------------------------------------------------|
+| `delimiter` | `,`           | Separator of `source-name`, `target-name`, `selected-fields` or `ignore-fields` |
+
+## Licence
+
+The same as HugeGraph, hugegraph-spark-connector is also licensed under Apache 2.0 License.
\ No newline at end of file
diff --git a/hugegraph-spark-connector/assembly/travis/install-hugegraph-from-tar.sh b/hugegraph-spark-connector/assembly/travis/install-hugegraph-from-tar.sh
new file mode 100755
index 00000000..00f388bf
--- /dev/null
+++ b/hugegraph-spark-connector/assembly/travis/install-hugegraph-from-tar.sh
@@ -0,0 +1,35 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with this
+# work for additional information regarding copyright ownership. The ASF
+# licenses this file to You under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+set -ev
+
+if [[ $# -ne 1 ]]; then
+    echo "Must input an existing commit id of hugegraph server" && exit 1
+fi
+
+VERSION=$1
+HUGEGRAPH_LINK="https://downloads.apache.org/incubator/hugegraph/${VERSION}/apache-hugegraph-incubating-${VERSION}.tar.gz"
+
+wget "${HUGEGRAPH_LINK}" -q --show-progress || exit 1
+tar zxvf apache-hugegraph-incubating-${VERSION}.tar.gz
+cd apache-hugegraph-incubating-${VERSION}
+
+## start HugeGraphServer with http protocol
+bin/init-store.sh
+bin/start-hugegraph.sh
+
+cd ../
diff --git a/hugegraph-spark-connector/pom.xml b/hugegraph-spark-connector/pom.xml
new file mode 100644
index 00000000..5f595f07
--- /dev/null
+++ b/hugegraph-spark-connector/pom.xml
@@ -0,0 +1,257 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with this
+  work for additional information regarding copyright ownership. The ASF
+  licenses this file to You under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+  WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+  License for the specific language governing permissions and limitations
+  under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.hugegraph</groupId>
+        <artifactId>hugegraph-toolchain</artifactId>
+        <version>${revision}</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+
+    <artifactId>hugegraph-spark-connector</artifactId>
+    <packaging>jar</packaging>
+    <name>${project.artifactId}</name>
+
+    <properties>
+        <maven.compiler.source>8</maven.compiler.source>
+        <maven.compiler.target>8</maven.compiler.target>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <spark.verion>3.2.2</spark.verion>
+        <spark.scope>provided</spark.scope>
+        <hugegraph.client.version>1.0.0</hugegraph.client.version>
+        <jackson.version>2.12.3</jackson.version>
+        <jersey.version>3.0.3</jersey.version>
+        <jersey.container.servlet.core.version>2.34</jersey.container.servlet.core.version>
+        <log4j.version>2.18.0</log4j.version>
+        <slf4j.api.version>1.7.25</slf4j.api.version>
+        <scala.minor.version>2.12.11</scala.minor.version>
+        <assembly.dir>${project.basedir}/assembly</assembly.dir>
+        <assembly.descriptor.dir>${assembly.dir}/descriptor</assembly.descriptor.dir>
+        <release.name>${project.artifactId}</release.name>
+        <final.name>apache-${release.name}-incubating-${project.version}</final.name>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-core_${scala.version}</artifactId>
+            <version>${spark.verion}</version>
+            <scope>${spark.scope}</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-api</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>jul-to-slf4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>jcl-over-slf4j</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-sql_${scala.version}</artifactId>
+            <version>${spark.verion}</version>
+            <scope>${spark.scope}</scope>
+            <exclusions>
+                <exclusion>
+                    <artifactId>slf4j-api</artifactId>
+                    <groupId>org.slf4j</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hugegraph</groupId>
+            <artifactId>hugegraph-client</artifactId>
+            <version>${hugegraph.client.version}</version>
+        </dependency>
+
+        <!-- jersey version override -->
+        <dependency>
+            <groupId>org.glassfish.jersey.core</groupId>
+            <artifactId>jersey-client</artifactId>
+            <version>${jersey.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.glassfish.jersey.core</groupId>
+            <artifactId>jersey-common</artifactId>
+            <version>${jersey.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.glassfish.jersey.inject</groupId>
+            <artifactId>jersey-hk2</artifactId>
+            <version>${jersey.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.glassfish.jersey.core</groupId>
+            <artifactId>jersey-server</artifactId>
+            <version>${jersey.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.glassfish.jersey.containers</groupId>
+            <artifactId>jersey-container-servlet-core</artifactId>
+            <version>${jersey.container.servlet.core.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <!-- jackson version override -->
+        <dependency>
+            <groupId>com.fasterxml.jackson.jaxrs</groupId>
+            <artifactId>jackson-jaxrs-base</artifactId>
+            <version>${jackson.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.jaxrs</groupId>
+            <artifactId>jackson-jaxrs-json-provider</artifactId>
+            <version>${jackson.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.module</groupId>
+            <artifactId>jackson-module-jaxb-annotations</artifactId>
+            <version>${jackson.version}</version>
+        </dependency>
+
+        <!-- logging version override -->
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+            <version>${slf4j.api.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-api</artifactId>
+            <version>${log4j.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-core</artifactId>
+            <version>${log4j.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-slf4j-impl</artifactId>
+            <version>${log4j.version}</version>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <configuration>
+                    <source>1.8</source>
+                    <target>1.8</target>
+                </configuration>
+                <executions>
+                    <execution>
+                        <phase>compile</phase>
+                        <goals>
+                            <goal>compile</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.scala-tools</groupId>
+                <artifactId>maven-scala-plugin</artifactId>
+                <version>2.15.2</version>
+                <configuration>
+                    <scalaVersion>${scala.minor.version}</scalaVersion>
+                    <args>
+                        <arg>-target:jvm-1.8</arg>
+                    </args>
+                    <jvmArgs>
+                        <jvmArg>-Xss4096K</jvmArg>
+                    </jvmArgs>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>scala-compile</id>
+                        <goals>
+                            <goal>compile</goal>
+                        </goals>
+                        <configuration>
+                            <excludes>
+                                <exclude>META-INF/*.SF</exclude>
+                                <exclude>META-INF/*.DSA</exclude>
+                                <exclude>META-INF/*.RSA</exclude>
+                            </excludes>
+                        </configuration>
+                    </execution>
+                    <execution>
+                        <id>scala-test-compile</id>
+                        <goals>
+                            <goal>testCompile</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <version>2.5.3</version>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                    </execution>
+                </executions>
+                <configuration>
+                    <descriptorRefs>
+                        <descriptorRef>jar-with-dependencies</descriptorRef>
+                    </descriptorRefs>
+                    <finalName>
+                        ${project.artifactId}-${project.version}-jar-with-dependencies
+                    </finalName>
+                    <appendAssemblyId>false</appendAssemblyId>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-source-plugin</artifactId>
+                <version>3.2.0</version>
+                <executions>
+                    <execution>
+                        <id>attach-sources</id>
+                        <goals>
+                            <goal>jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>
diff --git a/hugegraph-spark-connector/src/main/java/org/apache/hugegraph/spark/connector/builder/EdgeBuilder.java b/hugegraph-spark-connector/src/main/java/org/apache/hugegraph/spark/connector/builder/EdgeBuilder.java
new file mode 100644
index 00000000..9de225f8
--- /dev/null
+++ b/hugegraph-spark-connector/src/main/java/org/apache/hugegraph/spark/connector/builder/EdgeBuilder.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.spark.connector.builder;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+import org.apache.hugegraph.spark.connector.client.HGLoadContext;
+import org.apache.hugegraph.spark.connector.mapping.EdgeMapping;
+import org.apache.hugegraph.structure.graph.Edge;
+import org.apache.hugegraph.structure.graph.Vertex;
+import org.apache.hugegraph.structure.schema.EdgeLabel;
+import org.apache.hugegraph.structure.schema.SchemaLabel;
+import org.apache.hugegraph.structure.schema.VertexLabel;
+import org.apache.hugegraph.util.E;
+
+import com.google.common.collect.ImmutableList;
+
+public class EdgeBuilder extends ElementBuilder<Edge> {
+
+    private final EdgeMapping mapping;
+
+    private final EdgeLabel edgeLabel;
+
+    private final VertexLabel sourceLabel;
+
+    private final VertexLabel targetLabel;
+
+    // Used to optimize access performance
+    private VertexIdsIndex vertexIdsIndex;
+
+    public EdgeBuilder(HGLoadContext context, EdgeMapping mapping) {
+        super(context);
+        this.mapping = mapping;
+        this.edgeLabel = this.getEdgeLabel(this.mapping.label());
+        this.sourceLabel = this.getVertexLabel(this.edgeLabel.sourceLabel());
+        this.targetLabel = this.getVertexLabel(this.edgeLabel.targetLabel());
+        // Ensure that the source/target id fields are matched with id strategy
+        this.checkIdFields(this.sourceLabel, this.mapping.sourceFields());
+        this.checkIdFields(this.targetLabel, this.mapping.targetFields());
+
+        this.vertexIdsIndex = null;
+    }
+
+    @Override
+    public EdgeMapping mapping() {
+        return this.mapping;
+    }
+
+    @Override
+    public SchemaLabel schemaLabel() {
+        return this.edgeLabel;
+    }
+
+    @Override
+    protected boolean isIdField(String fieldName) {
+        return this.mapping.sourceFields().contains(fieldName) ||
+               this.mapping.targetFields().contains(fieldName);
+    }
+
+    @Override
+    public List<Edge> build(String[] names, Object[] values) {
+        if (Objects.isNull(this.vertexIdsIndex)) {
+            this.vertexIdsIndex = this.extractVertexIdsIndex(names);
+        }
+        EdgeKVPairs kvPairs = this.newEdgeKVPairs();
+        // get margin vertex's idField and idValue (id or pk strategy)
+        kvPairs.source.extractFromEdge(names, values, this.vertexIdsIndex.sourceIndexes);
+        kvPairs.target.extractFromEdge(names, values, this.vertexIdsIndex.targetIndexes);
+        kvPairs.extractProperties(names, values);
+
+        List<Vertex> sources = kvPairs.source.buildVertices(false, true);
+        List<Vertex> targets = kvPairs.target.buildVertices(false, true);
+        if (sources.isEmpty() || targets.isEmpty()) {
+            return ImmutableList.of();
+        }
+        E.checkArgument(sources.size() == 1 || targets.size() == 1 ||
+                        sources.size() == targets.size(),
+                        "The elements number of source and target must be: " +
+                        "1 to n, n to 1, n to n");
+        int size = Math.max(sources.size(), targets.size());
+        List<Edge> edges = new ArrayList<>(size);
+        for (int i = 0; i < size; i++) {
+            Vertex source = i < sources.size() ? sources.get(i) : sources.get(0);
+            Vertex target = i < targets.size() ? targets.get(i) : targets.get(0);
+            Edge edge = new Edge(this.mapping.label());
+            edge.source(source);
+            edge.target(target);
+            this.addProperties(edge, kvPairs.properties);
+            edges.add(edge);
+        }
+        return edges;
+    }
+
+    private EdgeKVPairs newEdgeKVPairs() {
+        EdgeKVPairs kvPairs = new EdgeKVPairs();
+        kvPairs.source = this.newKVPairs(this.sourceLabel);
+        kvPairs.target = this.newKVPairs(this.targetLabel);
+        return kvPairs;
+    }
+
+    private void checkIdFields(VertexLabel vertexLabel, List<String> fields) {
+        if (vertexLabel.idStrategy().isCustomize()) {
+            E.checkArgument(fields.size() == 1,
+                            "The source/target field can contains only one " +
+                            "column when id strategy is CUSTOMIZE");
+        } else if (vertexLabel.idStrategy().isPrimaryKey()) {
+            E.checkArgument(fields.size() >= 1,
+                            "The source/target field must contains some " +
+                            "columns when id strategy is PrimaryKey");
+        } else {
+            throw new IllegalArgumentException("Unsupported AUTOMATIC id strategy " +
+                                               "for hugegraph Spark Connector.");
+        }
+    }
+
+    public class EdgeKVPairs {
+
+        // No general properties
+        private VertexKVPairs source;
+
+        private VertexKVPairs target;
+
+        // General properties
+        private Map<String, Object> properties;
+
+        public void extractProperties(String[] names, Object[] values) {
+            // General properties
+            this.properties = new HashMap<>();
+            Set<String> props = schemaLabel().properties();
+            for (int i = 0; i < names.length; i++) {
+                String fieldName = names[i];
+                Object fieldValue = values[i];
+                if (!retainField(fieldName, fieldValue)) {
+                    continue;
+                }
+
+                String key = mapping.mappingField(fieldName);
+                if (isIdField(fieldName) && !props.contains(fieldName) && !props.contains(key)) {
+                    continue;
+                }
+
+                Object value = mappingValue(fieldName, fieldValue);
+                this.properties.put(key, value);
+            }
+        }
+    }
+
+    private VertexIdsIndex extractVertexIdsIndex(String[] names) {
+        VertexIdsIndex index = new VertexIdsIndex();
+        index.sourceIndexes = new int[this.mapping.sourceFields().size()];
+        int idx = 0;
+        for (String field : this.mapping.sourceFields()) {
+            for (int pos = 0; pos < names.length; pos++) {
+                String name = names[pos];
+                if (field.equals(name)) {
+                    index.sourceIndexes[idx++] = pos;
+                }
+            }
+        }
+
+        index.targetIndexes = new int[this.mapping.targetFields().size()];
+        idx = 0;
+        for (String field : this.mapping.targetFields()) {
+            for (int pos = 0; pos < names.length; pos++) {
+                String name = names[pos];
+                if (field.equals(name)) {
+                    index.targetIndexes[idx++] = pos;
+                }
+            }
+        }
+        return index;
+    }
+
+    private static class VertexIdsIndex {
+
+        private int[] sourceIndexes;
+
+        private int[] targetIndexes;
+    }
+}
diff --git a/hugegraph-spark-connector/src/main/java/org/apache/hugegraph/spark/connector/builder/ElementBuilder.java b/hugegraph-spark-connector/src/main/java/org/apache/hugegraph/spark/connector/builder/ElementBuilder.java
new file mode 100644
index 00000000..43089fbd
--- /dev/null
+++ b/hugegraph-spark-connector/src/main/java/org/apache/hugegraph/spark/connector/builder/ElementBuilder.java
@@ -0,0 +1,403 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.spark.connector.builder;
+
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.CharsetEncoder;
+import java.nio.charset.CoderResult;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.commons.collections.ListUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hugegraph.spark.connector.client.HGLoadContext;
+import org.apache.hugegraph.spark.connector.client.SchemaCache;
+import org.apache.hugegraph.spark.connector.constant.Constants;
+import org.apache.hugegraph.spark.connector.mapping.ElementMapping;
+import org.apache.hugegraph.spark.connector.utils.DataTypeUtils;
+import org.apache.hugegraph.structure.GraphElement;
+import org.apache.hugegraph.structure.constant.IdStrategy;
+import org.apache.hugegraph.structure.graph.Vertex;
+import org.apache.hugegraph.structure.schema.EdgeLabel;
+import org.apache.hugegraph.structure.schema.PropertyKey;
+import org.apache.hugegraph.structure.schema.SchemaLabel;
+import org.apache.hugegraph.structure.schema.VertexLabel;
+import org.apache.hugegraph.util.E;
+import org.apache.hugegraph.util.LongEncoding;
+
+import com.google.common.collect.ImmutableList;
+
+public abstract class ElementBuilder<GE extends GraphElement> {
+
+    private final SchemaCache schema;
+
+    // NOTE: CharsetEncoder is not thread safe
+    private final CharsetEncoder encoder;
+
+    private final ByteBuffer buffer;
+
+    public ElementBuilder(HGLoadContext context) {
+        this.schema = context.schemaCache();
+        this.encoder = Constants.CHARSET.newEncoder();
+        this.buffer = ByteBuffer.allocate(Constants.VERTEX_ID_LIMIT);
+    }
+
+    public abstract ElementMapping mapping();
+
+    public abstract SchemaLabel schemaLabel();
+
+    protected abstract boolean isIdField(String fieldName);
+
+    public abstract List<GE> build(String[] names, Object[] values);
+
+    protected VertexKVPairs newKVPairs(VertexLabel vertexLabel) {
+        IdStrategy idStrategy = vertexLabel.idStrategy();
+        if (idStrategy.isCustomize()) {
+            return new VertexIdKVPairs(vertexLabel);
+        } else {
+            assert idStrategy.isPrimaryKey();
+            return new VertexPkKVPairs(vertexLabel);
+        }
+    }
+
+    /**
+     * Retain only the key-value pairs needed by the current vertex or edge
+     */
+    protected boolean retainField(String fieldName, Object fieldValue) {
+        ElementMapping mapping = this.mapping();
+        Set<String> selectedFields = mapping.selectedFields();
+        Set<String> ignoredFields = mapping.ignoredFields();
+        // Retain selected fields or remove ignored fields
+        if (!selectedFields.isEmpty() && !selectedFields.contains(fieldName)) {
+            return false;
+        }
+        return ignoredFields.isEmpty() || !ignoredFields.contains(fieldName);
+    }
+
+    protected void addProperty(GraphElement element, String key, Object value) {
+        value = this.convertPropertyValue(key, value);
+        element.property(key, value);
+    }
+
+    protected void addProperties(GraphElement element, Map<String, Object> properties) {
+        for (Map.Entry<String, Object> entry : properties.entrySet()) {
+            String key = entry.getKey();
+            Object value = entry.getValue();
+            value = this.convertPropertyValue(key, value);
+            element.property(key, value);
+        }
+    }
+
+    protected PropertyKey getPropertyKey(String name) {
+        return this.schema.getPropertyKey(name);
+    }
+
+    protected VertexLabel getVertexLabel(String name) {
+        return this.schema.getVertexLabel(name);
+    }
+
+    protected EdgeLabel getEdgeLabel(String name) {
+        return this.schema.getEdgeLabel(name);
+    }
+
+    protected Object mappingValue(String fieldName, Object fieldValue) {
+        if (this.mapping().mappingValues().isEmpty()) {
+            return fieldValue;
+        }
+        String fieldStrValue = String.valueOf(fieldValue);
+        return this.mapping().mappingValue(fieldName, fieldStrValue);
+    }
+
+    private void customizeId(VertexLabel vertexLabel, Vertex vertex,
+                             String idField, Object idValue) {
+        E.checkArgumentNotNull(idField, "The vertex id field can't be null");
+        E.checkArgumentNotNull(idValue, "The vertex id value can't be null");
+        IdStrategy idStrategy = vertexLabel.idStrategy();
+        if (idStrategy.isCustomizeString()) {
+            String id = (String) idValue;
+            this.checkVertexIdLength(id);
+            vertex.id(id);
+        } else if (idStrategy.isCustomizeNumber()) {
+            Long id = DataTypeUtils.parseNumber(idField, idValue);
+            vertex.id(id);
+        } else {
+            assert idStrategy.isCustomizeUuid();
+            UUID id = DataTypeUtils.parseUUID(idField, idValue);
+            vertex.id(id);
+        }
+    }
+
+    private Object convertPropertyValue(String key, Object rawValue) {
+        PropertyKey propertyKey = this.getPropertyKey(key);
+        return DataTypeUtils.convert(rawValue, propertyKey);
+    }
+
+    private boolean vertexIdEmpty(VertexLabel vertexLabel, Vertex vertex) {
+        IdStrategy idStrategy = vertexLabel.idStrategy();
+        if (idStrategy.isCustomizeString()) {
+            Object vertexId = vertex.id();
+            return vertexId == null || StringUtils.isEmpty((String) vertexId);
+        }
+        return false;
+    }
+
+    private void checkPrimaryValuesValid(VertexLabel vertexLabel, Object[] primaryValues) {
+        List<String> primaryKeys = vertexLabel.primaryKeys();
+        E.checkArgument(primaryKeys.size() == primaryValues.length,
+                        "Missing some primary key values, expect %s, " +
+                        "but only got %s for vertex label '%s'",
+                        primaryKeys, Arrays.toString(primaryValues), vertexLabel);
+
+        for (int i = 0; i < primaryKeys.size(); i++) {
+            E.checkArgument(primaryValues[i] != null,
+                            "Make sure the value of the primary key '%s' is " +
+                            "not empty, or check whether the headers or " +
+                            "field_mapping are configured correctly",
+                            primaryKeys.get(i));
+        }
+    }
+
+    private String spliceVertexId(VertexLabel vertexLabel, Object... primaryValues) {
+        StringBuilder vertexId = new StringBuilder();
+        StringBuilder vertexKeysId = new StringBuilder();
+        for (Object value : primaryValues) {
+            String pkValue;
+            if (value instanceof Number || value instanceof Date) {
+                pkValue = LongEncoding.encodeNumber(value);
+            } else {
+                pkValue = String.valueOf(value);
+            }
+            if (StringUtils.containsAny(pkValue, Constants.SEARCH_LIST)) {
+                pkValue = StringUtils.replaceEach(pkValue,
+                                                  Constants.SEARCH_LIST,
+                                                  Constants.TARGET_LIST);
+            }
+            vertexKeysId.append(pkValue);
+            vertexKeysId.append("!");
+        }
+        vertexId.append(vertexLabel.id()).append(":").append(vertexKeysId);
+        vertexId.deleteCharAt(vertexId.length() - 1);
+        return vertexId.toString();
+    }
+
+    private void checkVertexIdLength(String id) {
+        this.encoder.reset();
+        this.buffer.clear();
+        CoderResult r = this.encoder.encode(CharBuffer.wrap(id.toCharArray()),
+                                            this.buffer, true);
+        E.checkArgument(r.isUnderflow(),
+                        "The vertex id length exceeds limit %s : '%s'",
+                        Constants.VERTEX_ID_LIMIT, id);
+    }
+
+    private boolean isEmptyPkValue(Object pkValue) {
+        if (pkValue == null) {
+            return true;
+        }
+        if (pkValue instanceof String) {
+            String pkValueStr = (String) pkValue;
+            return pkValueStr.isEmpty();
+        }
+        return false;
+    }
+
+    public abstract static class VertexKVPairs {
+
+        public final VertexLabel vertexLabel;
+
+        // General properties
+        public Map<String, Object> properties;
+
+        public VertexKVPairs(VertexLabel vertexLabel) {
+            this.vertexLabel = vertexLabel;
+            this.properties = null;
+        }
+
+        public abstract void extractFromVertex(String[] names, Object[] values);
+
+        public abstract void extractFromEdge(String[] names, Object[] values, int[] fieldIndexes);
+
+        public abstract List<Vertex> buildVertices(boolean withProperty, boolean withId);
+    }
+
+    public class VertexIdKVPairs extends VertexKVPairs {
+
+        // The idField(raw field), like: id
+        private String idField;
+
+        // The single idValue(mapped), like: A -> 1
+        private Object idValue;
+
+        public VertexIdKVPairs(VertexLabel vertexLabel) {
+            super(vertexLabel);
+        }
+
+        @Override
+        public void extractFromVertex(String[] names, Object[] values) {
+            // General properties
+            this.properties = new HashMap<>();
+            for (int i = 0; i < names.length; i++) {
+                String fieldName = names[i];
+                Object fieldValue = values[i];
+                if (!retainField(fieldName, fieldValue)) {
+                    continue;
+                }
+                if (isIdField(fieldName)) {
+                    this.idField = fieldName;
+                    this.idValue = mappingValue(fieldName, fieldValue);
+                } else {
+                    String key = mapping().mappingField(fieldName);
+                    Object value = mappingValue(fieldName, fieldValue);
+                    this.properties.put(key, value);
+                }
+            }
+        }
+
+        @Override
+        public void extractFromEdge(String[] names, Object[] values, int[] fieldIndexes) {
+            assert fieldIndexes.length == 1;
+            String fieldName = names[fieldIndexes[0]];
+            Object fieldValue = values[fieldIndexes[0]];
+            this.idField = fieldName;
+            this.idValue = mappingValue(fieldName, fieldValue);
+        }
+
+        @Override
+        public List<Vertex> buildVertices(boolean withProperty, boolean withId) {
+            Vertex vertex = new Vertex(vertexLabel.name());
+            customizeId(vertexLabel, vertex, this.idField, this.idValue);
+            // empty string id ("")
+            if (vertexIdEmpty(vertexLabel, vertex)) {
+                return ImmutableList.of();
+            }
+            if (withProperty) {
+                String key = mapping().mappingField(this.idField);
+                // The id field is also used as a general property
+                if (vertexLabel.properties().contains(key)) {
+                    addProperty(vertex, key, this.idValue);
+                }
+                addProperties(vertex, this.properties);
+            }
+            return ImmutableList.of(vertex);
+        }
+    }
+
+    public class VertexPkKVPairs extends VertexKVPairs {
+
+        /*
+         * The primary key names(mapped), allowed multiple
+         * like: [p_name,p_age] -> [name,age]
+         */
+        private List<String> pkNames;
+
+        /*
+         * The primary values(mapped), length is the same as pkNames
+         * like: [m,2] -> [marko,18]
+         */
+        private Object[] pkValues;
+
+        public VertexPkKVPairs(VertexLabel vertexLabel) {
+            super(vertexLabel);
+        }
+
+        @Override
+        public void extractFromVertex(String[] names, Object[] values) {
+            List<String> primaryKeys = this.vertexLabel.primaryKeys();
+            this.pkNames = primaryKeys;
+            this.pkValues = new Object[primaryKeys.size()];
+            // General properties
+            this.properties = new HashMap<>();
+            for (int i = 0; i < names.length; i++) {
+                String fieldName = names[i];
+                Object fieldValue = values[i];
+                if (!retainField(fieldName, fieldValue)) {
+                    continue;
+                }
+                String key = mapping().mappingField(fieldName);
+                if (primaryKeys.contains(key)) {
+                    // Don't put primary key/values into general properties
+                    int index = primaryKeys.indexOf(key);
+                    Object pkValue = mappingValue(fieldName, fieldValue);
+                    this.pkValues[index] = pkValue;
+                } else {
+                    Object value = mappingValue(fieldName, fieldValue);
+                    this.properties.put(key, value);
+                }
+            }
+        }
+
+        @Override
+        public void extractFromEdge(String[] names, Object[] values, int[] fieldIndexes) {
+            this.pkNames = new ArrayList<>(fieldIndexes.length);
+            for (int fieldIndex : fieldIndexes) {
+                String fieldName = names[fieldIndex];
+                String mappingField = mapping().mappingField(fieldName);
+                this.pkNames.add(mappingField);
+            }
+            List<String> primaryKeys = this.vertexLabel.primaryKeys();
+            E.checkArgument(ListUtils.isEqualList(this.pkNames, primaryKeys),
+                            "Make sure the the primary key fields %s are " +
+                            "not empty, or check whether the headers or " +
+                            "field_mapping are configured correctly",
+                            primaryKeys);
+            this.pkValues = new Object[this.pkNames.size()];
+            for (int i = 0; i < fieldIndexes.length; i++) {
+                String fieldName = names[fieldIndexes[i]];
+                Object fieldValue = values[fieldIndexes[i]];
+                Object pkValue = mappingValue(fieldName, fieldValue);
+                this.pkValues[i] = pkValue;
+            }
+        }
+
+        @Override
+        public List<Vertex> buildVertices(boolean withProperty, boolean withId) {
+            checkPrimaryValuesValid(vertexLabel, this.pkValues);
+            for (int i = 0; i < this.pkNames.size(); i++) {
+                if (isEmptyPkValue(this.pkValues[i])) {
+                    return ImmutableList.of();
+                }
+                Object pkValue = convertPropertyValue(this.pkNames.get(i), this.pkValues[i]);
+                this.pkValues[i] = pkValue;
+            }
+            String id = spliceVertexId(vertexLabel, this.pkValues);
+            checkVertexIdLength(id);
+
+            Vertex vertex = new Vertex(vertexLabel.name());
+            // NOTE: withProperty is true means that parsing vertex
+            if (withProperty) {
+                for (int i = 0; i < this.pkNames.size(); i++) {
+                    addProperty(vertex, this.pkNames.get(i), this.pkValues[i]);
+                }
+                addProperties(vertex, this.properties);
+            }
+            if (withId) {
+                vertex.id(id);
+            } else {
+                vertex.id(null);
+            }
+            return ImmutableList.of(vertex);
+        }
+    }
+}
diff --git a/hugegraph-spark-connector/src/main/java/org/apache/hugegraph/spark/connector/builder/VertexBuilder.java b/hugegraph-spark-connector/src/main/java/org/apache/hugegraph/spark/connector/builder/VertexBuilder.java
new file mode 100644
index 00000000..bb78e571
--- /dev/null
+++ b/hugegraph-spark-connector/src/main/java/org/apache/hugegraph/spark/connector/builder/VertexBuilder.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.spark.connector.builder;
+
+import java.util.List;
+
+import org.apache.hugegraph.spark.connector.client.HGLoadContext;
+import org.apache.hugegraph.spark.connector.mapping.VertexMapping;
+import org.apache.hugegraph.structure.graph.Vertex;
+import org.apache.hugegraph.structure.schema.SchemaLabel;
+import org.apache.hugegraph.structure.schema.VertexLabel;
+import org.apache.hugegraph.util.E;
+
+public class VertexBuilder extends ElementBuilder<Vertex> {
+
+    private final VertexMapping mapping;
+
+    private final VertexLabel vertexLabel;
+
+    public VertexBuilder(HGLoadContext context, VertexMapping mapping) {
+        super(context);
+        this.mapping = mapping;
+        this.vertexLabel = this.getVertexLabel(this.mapping.label());
+        // Ensure the id field is matched with id strategy
+        this.checkIdField();
+    }
+
+    @Override
+    public VertexMapping mapping() {
+        return this.mapping;
+    }
+
+    @Override
+    public SchemaLabel schemaLabel() {
+        return this.vertexLabel;
+    }
+
+    @Override
+    protected boolean isIdField(String fieldName) {
+        return fieldName.equals(this.mapping.idField());
+    }
+
+    @Override
+    public List<Vertex> build(String[] names, Object[] values) {
+        VertexKVPairs kvPairs = this.newKVPairs(this.vertexLabel);
+        kvPairs.extractFromVertex(names, values);
+        return kvPairs.buildVertices(true, false);
+    }
+
+    private void checkIdField() {
+        String name = this.vertexLabel.name();
+        if (this.vertexLabel.idStrategy().isCustomize()) {
+            E.checkState(this.mapping.idField() != null,
+                         "The id field can't be empty or null when " +
+                         "id strategy is '%s' for vertex label '%s'",
+                         this.vertexLabel.idStrategy(), name);
+        } else if (this.vertexLabel.idStrategy().isPrimaryKey()) {
+            E.checkState(this.mapping.idField() == null,
+                         "The id field must be empty or null when " +
+                         "id strategy is '%s' for vertex label '%s'",
+                         this.vertexLabel.idStrategy(), name);
+        } else {
+            // The id strategy is automatic
+            throw new IllegalArgumentException("Unsupported AUTOMATIC id strategy for " +
+                                               "hugegraph Spark Connector.");
+        }
+    }
+}
diff --git a/hugegraph-spark-connector/src/main/java/org/apache/hugegraph/spark/connector/client/HGClientHolder.java b/hugegraph-spark-connector/src/main/java/org/apache/hugegraph/spark/connector/client/HGClientHolder.java
new file mode 100644
index 00000000..65f08183
--- /dev/null
+++ b/hugegraph-spark-connector/src/main/java/org/apache/hugegraph/spark/connector/client/HGClientHolder.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.spark.connector.client;
+
+import java.io.Serializable;
+import java.nio.file.Paths;
+import java.util.Objects;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hugegraph.driver.HugeClient;
+import org.apache.hugegraph.driver.HugeClientBuilder;
+import org.apache.hugegraph.exception.ServerException;
+import org.apache.hugegraph.rest.ClientException;
+import org.apache.hugegraph.spark.connector.constant.Constants;
+import org.apache.hugegraph.spark.connector.exception.LoadException;
+import org.apache.hugegraph.spark.connector.options.HGOptions;
+import org.apache.hugegraph.util.E;
+
+public final class HGClientHolder implements Serializable {
+
+    public static HugeClient create(HGOptions options) {
+        String host = options.host();
+        int port = options.port();
+        String graph = options.graph();
+        String username = options.username();
+        String token = options.token();
+        String trustStoreFile = options.trustStoreFile();
+        String trustStoreToken = options.trustStoreToken();
+
+        boolean useHttps = options.protocol().equals(Constants.HTTPS_SCHEMA);
+        String address = host + Constants.COLON_STR + port;
+        if (!host.startsWith(Constants.HTTP_PREFIX) && !host.startsWith(Constants.HTTPS_PREFIX)) {
+            if (useHttps) {
+                address = Constants.HTTPS_PREFIX + address;
+            } else {
+                address = Constants.HTTP_PREFIX + address;
+            }
+        }
+        username = Objects.nonNull(username) ? username : graph;
+        HugeClientBuilder builder;
+        try {
+            builder = HugeClient.builder(address, graph)
+                                .configUser(username, token)
+                                .configTimeout(options.timeout())
+                                .configPool(options.maxConnection(),
+                                            options.maxConnectionPerRoute());
+            if (useHttps) {
+                String trustFile = trustStoreFile;
+                if (Objects.isNull(trustStoreFile)) {
+                    String homePath = System.getProperty(Constants.CONNECTOR_HOME_PATH);
+                    E.checkArgument(StringUtils.isNotEmpty(homePath),
+                                    "The system property 'connector.home.path' " +
+                                    "can't be null or empty when enable https protocol");
+                    trustFile = Paths.get(homePath, Constants.TRUST_STORE_PATH).toString();
+                }
+                trustStoreToken = Objects.isNull(trustStoreToken) ?
+                                  Constants.DEFAULT_TRUST_STORE_TOKEN : trustStoreToken;
+                builder.configSSL(trustFile, trustStoreToken);
+            }
+            return builder.build();
+        } catch (IllegalStateException e) {
+            String message = e.getMessage();
+            if (Objects.nonNull(message) && message.startsWith("The version")) {
+                throw new LoadException("The version of hugegraph-client and " +
+                                        "hugegraph-server don't match", e);
+            }
+            throw e;
+        } catch (ServerException e) {
+            String message = e.getMessage();
+            if (Constants.STATUS_UNAUTHORIZED == e.status() ||
+                (Objects.nonNull(message) && message.startsWith("Authentication"))) {
+                throw new LoadException("Incorrect username or password", e);
+            }
+            throw e;
+        } catch (ClientException e) {
+            Throwable cause = e.getCause();
+            if (cause == null || cause.getMessage() == null) {
+                throw e;
+            }
+            String message = cause.getMessage();
+            if (message.contains("Connection refused")) {
+                throw new LoadException(
+                        String.format("The service %s:%s is unavailable", host, port), e);
+            } else if (message.contains("java.net.UnknownHostException") ||
+                       message.contains("Host name may not be null")) {
+                throw new LoadException(String.format("The host %s is unknown", host), e);
+            } else if (message.contains("connect timed out")) {
+                throw new LoadException(String.format("Connect service %s:%s timeout, " +
+                                                      "please check service is available " +
+                                                      "and network is unobstructed", host, port),
+                                        e);
+            }
+            throw e;
+        }
+    }
+}
diff --git a/hugegraph-spark-connector/src/main/java/org/apache/hugegraph/spark/connector/client/HGLoadContext.java b/hugegraph-spark-connector/src/main/java/org/apache/hugegraph/spark/connector/client/HGLoadContext.java
new file mode 100644
index 00000000..6201a022
--- /dev/null
+++ b/hugegraph-spark-connector/src/main/java/org/apache/hugegraph/spark/connector/client/HGLoadContext.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.spark.connector.client;
+
+import java.io.Serializable;
+
+import org.apache.hugegraph.driver.HugeClient;
+import org.apache.hugegraph.exception.ServerException;
+import org.apache.hugegraph.spark.connector.exception.LoadException;
+import org.apache.hugegraph.spark.connector.options.HGOptions;
+import org.apache.hugegraph.structure.constant.GraphMode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class HGLoadContext implements Serializable {
+
+    private static final Logger LOG = LoggerFactory.getLogger(HGLoadContext.class);
+
+    private final HGOptions options;
+
+    private final HugeClient client;
+
+    private final SchemaCache schemaCache;
+
+    public HGLoadContext(HGOptions options) {
+        this.options = options;
+        this.client = HGClientHolder.create(options);
+        this.schemaCache = new SchemaCache(this.client);
+    }
+
+    public HGOptions options() {
+        return this.options;
+    }
+
+    public HugeClient client() {
+        return this.client;
+    }
+
+    public SchemaCache schemaCache() {
+        return this.schemaCache;
+    }
+
+    public void updateSchemaCache() {
+        assert this.client != null;
+        this.schemaCache.updateAll();
+    }
+
+    public void setLoadingMode() {
+        String graph = this.client.graph().graph();
+        try {
+            this.client.graphs().mode(graph, GraphMode.LOADING);
+        } catch (ServerException e) {
+            if (e.getMessage().contains("Can not deserialize value of type")) {
+                LOG.warn("HugeGraphServer doesn't support loading mode");
+            } else {
+                throw e;
+            }
+        }
+    }
+
+    public void unsetLoadingMode() {
+        try {
+            String graph = this.client.graph().graph();
+            GraphMode mode = this.client.graphs().mode(graph);
+            if (mode.loading()) {
+                this.client.graphs().mode(graph, GraphMode.NONE);
+            }
+        } catch (Exception e) {
+            throw new LoadException(String.format("Failed to unset mode %s for server",
+                                                  GraphMode.LOADING), e);
+        }
+    }
+
+    public void close() {
+        LOG.info("Write load progress successfully");
+        this.client.close();
+        LOG.info("Close HugeClient successfully");
+    }
+}
diff --git a/hugegraph-spark-connector/src/main/java/org/apache/hugegraph/spark/connector/client/SchemaCache.java b/hugegraph-spark-connector/src/main/java/org/apache/hugegraph/spark/connector/client/SchemaCache.java
new file mode 100644
index 00000000..3ac9a953
--- /dev/null
+++ b/hugegraph-spark-connector/src/main/java/org/apache/hugegraph/spark/connector/client/SchemaCache.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.spark.connector.client;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hugegraph.driver.HugeClient;
+import org.apache.hugegraph.exception.ServerException;
+import org.apache.hugegraph.spark.connector.exception.LoadException;
+import org.apache.hugegraph.structure.schema.EdgeLabel;
+import org.apache.hugegraph.structure.schema.PropertyKey;
+import org.apache.hugegraph.structure.schema.VertexLabel;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+public final class SchemaCache {
+
+    private final HugeClient client;
+
+    private final Map<String, PropertyKey> propertyKeys;
+
+    private final Map<String, VertexLabel> vertexLabels;
+
+    private final Map<String, EdgeLabel> edgeLabels;
+
+    public SchemaCache(HugeClient client) {
+        this.client = client;
+        this.propertyKeys = new HashMap<>();
+        this.vertexLabels = new HashMap<>();
+        this.edgeLabels = new HashMap<>();
+    }
+
+    public SchemaCache(@JsonProperty(value = "propertykeys") List<PropertyKey> propertyKeyList,
+                       @JsonProperty(value = "vertexlabels") List<VertexLabel> vertexLabelList,
+                       @JsonProperty(value = "edgelabels") List<EdgeLabel> edgeLabelList) {
+        this.client = null;
+        this.propertyKeys = new HashMap<>();
+        this.vertexLabels = new HashMap<>();
+        this.edgeLabels = new HashMap<>();
+        propertyKeyList.forEach(pk -> this.propertyKeys.put(pk.name(), pk));
+        vertexLabelList.forEach(vl -> this.vertexLabels.put(vl.name(), vl));
+        edgeLabelList.forEach(el -> this.edgeLabels.put(el.name(), el));
+    }
+
+    public void updateAll() {
+        this.propertyKeys.clear();
+        client.schema().getPropertyKeys().forEach(pk -> this.propertyKeys.put(pk.name(), pk));
+        this.vertexLabels.clear();
+        client.schema().getVertexLabels().forEach(vl -> this.vertexLabels.put(vl.name(), vl));
+        this.edgeLabels.clear();
+        client.schema().getEdgeLabels().forEach(el -> this.edgeLabels.put(el.name(), el));
+    }
+
+    public PropertyKey getPropertyKey(String name) {
+        PropertyKey propertyKey = this.propertyKeys.get(name);
+        if (propertyKey == null) {
+            try {
+                propertyKey = this.client.schema().getPropertyKey(name);
+            } catch (ServerException e) {
+                throw new LoadException(String.format("The property key '%s' doesn't exist", name));
+            }
+        }
+        return propertyKey;
+    }
+
+    public VertexLabel getVertexLabel(String name) {
+        VertexLabel vertexLabel = this.vertexLabels.get(name);
+        if (vertexLabel == null) {
+            try {
+                vertexLabel = this.client.schema().getVertexLabel(name);
+            } catch (ServerException e) {
+                throw new LoadException(String.format("The vertex label '%s' doesn't exist", name));
+            }
+        }
+        return vertexLabel;
+    }
+
+    public EdgeLabel getEdgeLabel(String name) {
+        EdgeLabel edgeLabel = this.edgeLabels.get(name);
+        if (edgeLabel == null) {
+            try {
+                edgeLabel = this.client.schema().getEdgeLabel(name);
+            } catch (ServerException e) {
+                throw new LoadException(String.format("The edge label '%s' doesn't exist", name));
+            }
+        }
+        return edgeLabel;
+    }
+
+    public boolean isEmpty() {
+        return this.propertyKeys.isEmpty() && this.vertexLabels.isEmpty() &&
+               this.edgeLabels.isEmpty();
+    }
+}
diff --git a/hugegraph-spark-connector/src/main/java/org/apache/hugegraph/spark/connector/constant/Constants.java b/hugegraph-spark-connector/src/main/java/org/apache/hugegraph/spark/connector/constant/Constants.java
new file mode 100644
index 00000000..bcf399b2
--- /dev/null
+++ b/hugegraph-spark-connector/src/main/java/org/apache/hugegraph/spark/connector/constant/Constants.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.spark.connector.constant;
+
+import java.nio.charset.Charset;
+
+import com.google.common.base.Charsets;
+
+public final class Constants {
+
+    public static final Charset CHARSET = Charsets.UTF_8;
+
+    public static final String HTTP_PREFIX = "http://";
+
+    public static final String HTTPS_PREFIX = "https://";
+
+    public static final String TRUST_STORE_PATH = "conf/hugegraph.truststore";
+
+    public static final String COLON_STR = ":";
+
+    public static final String COMMA_STR = ",";
+
+    public static final int STATUS_UNAUTHORIZED = 401;
+
+    public static final int VERTEX_ID_LIMIT = 128;
+
+    public static final String[] SEARCH_LIST = new String[]{":", "!"};
+
+    public static final String[] TARGET_LIST = new String[]{"`:", "`!"};
+
+    public static final String TIMESTAMP = "timestamp";
+
+    public static final String TIME_ZONE = "GMT+8";
+
+    public static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
+
+    public static final String DEFAULT_TRUST_STORE_TOKEN = "hugegraph";
+
+    public static final String CONNECTOR_HOME_PATH = "connector.home.path";
+
+    public static final String HTTPS_SCHEMA = "https";
+
+    public static final String HTTP_SCHEMA = "http";
+
+    public static final String DEFAULT_HOST = "localhost";
+
+    public static final int DEFAULT_PORT = 8080;
+
+    public static final String DEFAULT_GRAPH = "hugegraph";
+
+    public static final String DEFAULT_PROTOCOL = HTTP_SCHEMA;
+
+    public static final int DEFAULT_TIMEOUT = 60;
+
+    public static final int DEFAULT_BATCH_SIZE = 500;
+}
diff --git a/hugegraph-spark-connector/src/main/java/org/apache/hugegraph/spark/connector/constant/DataTypeEnum.java b/hugegraph-spark-connector/src/main/java/org/apache/hugegraph/spark/connector/constant/DataTypeEnum.java
new file mode 100644
index 00000000..ad0769f3
--- /dev/null
+++ b/hugegraph-spark-connector/src/main/java/org/apache/hugegraph/spark/connector/constant/DataTypeEnum.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.spark.connector.constant;
+
+public enum DataTypeEnum {
+
+    VERTEX("vertex"),
+
+    EDGE("edge");
+
+    private final String type;
+
+    DataTypeEnum(String type) {
+        this.type = type;
+    }
+
+    public String dataType() {
+        return this.type;
+    }
+
+    public static boolean isVertex(String type) {
+        return VERTEX.type.equalsIgnoreCase(type);
+    }
+
+    public static boolean isEdge(String type) {
+        return EDGE.type.equalsIgnoreCase(type);
+    }
+
+    public static boolean validDataType(String type) {
+        return VERTEX.name().equalsIgnoreCase(type) || EDGE.name().equalsIgnoreCase(type);
+    }
+}
diff --git a/hugegraph-spark-connector/src/main/java/org/apache/hugegraph/spark/connector/exception/LoadException.java b/hugegraph-spark-connector/src/main/java/org/apache/hugegraph/spark/connector/exception/LoadException.java
new file mode 100644
index 00000000..a5272563
--- /dev/null
+++ b/hugegraph-spark-connector/src/main/java/org/apache/hugegraph/spark/connector/exception/LoadException.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.spark.connector.exception;
+
+public class LoadException extends RuntimeException {
+
+    private static final long serialVersionUID = 5504623124963497613L;
+
+    public LoadException(String message) {
+        super(message);
+    }
+
+    public LoadException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public LoadException(String message, Object... args) {
+        super(String.format(message, args));
+    }
+
+    public LoadException(String message, Throwable cause, Object... args) {
+        super(String.format(message, args), cause);
+    }
+}
diff --git a/hugegraph-spark-connector/src/main/java/org/apache/hugegraph/spark/connector/mapping/EdgeMapping.java b/hugegraph-spark-connector/src/main/java/org/apache/hugegraph/spark/connector/mapping/EdgeMapping.java
new file mode 100644
index 00000000..f12c73e2
--- /dev/null
+++ b/hugegraph-spark-connector/src/main/java/org/apache/hugegraph/spark/connector/mapping/EdgeMapping.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.spark.connector.mapping;
+
+import java.util.List;
+
+import org.apache.hugegraph.spark.connector.constant.DataTypeEnum;
+import org.apache.hugegraph.util.E;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class EdgeMapping extends ElementMapping {
+
+    @JsonProperty("source")
+    private final List<String> sourceFields;
+
+    @JsonProperty("target")
+    private final List<String> targetFields;
+
+    public EdgeMapping(List<String> sourceFields, List<String> targetFields) {
+        this.sourceFields = sourceFields;
+        this.targetFields = targetFields;
+    }
+
+    @Override
+    public DataTypeEnum type() {
+        return DataTypeEnum.EDGE;
+    }
+
+    @Override
+    public void check() throws IllegalArgumentException {
+        super.check();
+        E.checkArgument(this.sourceFields != null && !this.sourceFields.isEmpty(),
+                        "The source field of edge label '%s' can't be null or empty", this.label());
+        E.checkArgument(this.targetFields != null && !this.targetFields.isEmpty(),
+                        "The target field of edge label '%s' can't be null or empty", this.label());
+    }
+
+    public List<String> sourceFields() {
+        return this.sourceFields;
+    }
+
+    public List<String> targetFields() {
+        return this.targetFields;
+    }
+
+    @Override
+    public String toString() {
+        return String.format("edge-mapping(label=%s)", this.label());
+    }
+}
diff --git a/hugegraph-spark-connector/src/main/java/org/apache/hugegraph/spark/connector/mapping/ElementMapping.java b/hugegraph-spark-connector/src/main/java/org/apache/hugegraph/spark/connector/mapping/ElementMapping.java
new file mode 100644
index 00000000..4bcdb40e
--- /dev/null
+++ b/hugegraph-spark-connector/src/main/java/org/apache/hugegraph/spark/connector/mapping/ElementMapping.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.spark.connector.mapping;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hugegraph.spark.connector.constant.DataTypeEnum;
+import org.apache.hugegraph.util.E;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonPropertyOrder;
+
+@JsonPropertyOrder({"label"})
+public abstract class ElementMapping implements Serializable {
+
+    @JsonProperty("label")
+    private String label;
+
+    @JsonProperty("field_mapping")
+    private Map<String, String> mappingFields;
+
+    @JsonProperty("value_mapping")
+    private Map<String, Map<String, Object>> mappingValues;
+
+    @JsonProperty("selected")
+    private Set<String> selectedFields;
+
+    @JsonProperty("ignored")
+    private Set<String> ignoredFields;
+
+    @JsonProperty("batch_size")
+    private int batchSize;
+
+    public ElementMapping() {
+        this.mappingFields = new HashMap<>();
+        this.mappingValues = new HashMap<>();
+        this.selectedFields = new HashSet<>();
+        this.ignoredFields = new HashSet<>();
+        this.batchSize = 0;
+    }
+
+    public abstract DataTypeEnum type();
+
+    public void check() throws IllegalArgumentException {
+        this.mappingFields.values().forEach(value -> E.checkArgument(value != null,
+                                                                     "The value in field_mapping " +
+                                                                     "can't be null"));
+        this.mappingValues.values()
+                          .forEach(m -> m.values().forEach(value -> E.checkArgument(value != null,
+                                                           "The value in " +
+                                                           "value_mapping can't be null")));
+    }
+
+    public String label() {
+        return this.label;
+    }
+
+    public void label(String label) {
+        this.label = label;
+    }
+
+    public Map<String, String> mappingFields() {
+        return this.mappingFields;
+    }
+
+    public void mappingFields(Map<String, String> mappingFields) {
+        this.mappingFields = mappingFields;
+    }
+
+    public String mappingField(String fieldName) {
+        if (this.mappingFields.isEmpty()) {
+            return fieldName;
+        }
+        String mappingName = this.mappingFields.get(fieldName);
+        return mappingName != null ? mappingName : fieldName;
+    }
+
+    public Map<String, Map<String, Object>> mappingValues() {
+        return this.mappingValues;
+    }
+
+    public void mappingValues(Map<String, Map<String, Object>> mappingValues) {
+        this.mappingValues = mappingValues;
+    }
+
+    public Object mappingValue(String fieldName, String rawValue) {
+        if (this.mappingValues.isEmpty()) {
+            return rawValue;
+        }
+        Object mappingValue = rawValue;
+        Map<String, Object> values = this.mappingValues.get(fieldName);
+        if (values != null) {
+            Object value = values.get(rawValue);
+            if (value != null) {
+                mappingValue = value;
+            }
+        }
+        return mappingValue;
+    }
+
+    public int batchSize() {
+        return this.batchSize;
+    }
+
+    public void batchSize(int batchSize) {
+        this.batchSize = batchSize;
+    }
+
+    public Set<String> selectedFields() {
+        return this.selectedFields;
+    }
+
+    public void selectedFields(Set<String> selectedFields) {
+        this.selectedFields = selectedFields;
+    }
+
+    public Set<String> ignoredFields() {
+        return this.ignoredFields;
+    }
+
+    public void ignoredFields(Set<String> ignoredFields) {
+        this.ignoredFields = ignoredFields;
+    }
+}
diff --git a/hugegraph-spark-connector/src/main/java/org/apache/hugegraph/spark/connector/mapping/VertexMapping.java b/hugegraph-spark-connector/src/main/java/org/apache/hugegraph/spark/connector/mapping/VertexMapping.java
new file mode 100644
index 00000000..9a9928d5
--- /dev/null
+++ b/hugegraph-spark-connector/src/main/java/org/apache/hugegraph/spark/connector/mapping/VertexMapping.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.spark.connector.mapping;
+
+import org.apache.hugegraph.spark.connector.constant.DataTypeEnum;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class VertexMapping extends ElementMapping {
+
+    @JsonProperty("id")
+    private final String idField;
+
+    public VertexMapping(String idField) {
+        this.idField = idField;
+    }
+
+    @Override
+    public DataTypeEnum type() {
+        return DataTypeEnum.VERTEX;
+    }
+
+    @Override
+    public void check() throws IllegalArgumentException {
+        super.check();
+    }
+
+    public String idField() {
+        return this.idField;
+    }
+
+    @Override
+    public String toString() {
+        return String.format("vertex-mapping(label=%s)", this.label());
+    }
+}
diff --git a/hugegraph-spark-connector/src/main/java/org/apache/hugegraph/spark/connector/options/HGOptions.java b/hugegraph-spark-connector/src/main/java/org/apache/hugegraph/spark/connector/options/HGOptions.java
new file mode 100644
index 00000000..20a5b2cd
--- /dev/null
+++ b/hugegraph-spark-connector/src/main/java/org/apache/hugegraph/spark/connector/options/HGOptions.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.spark.connector.options;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hugegraph.spark.connector.constant.Constants;
+import org.apache.hugegraph.spark.connector.constant.DataTypeEnum;
+import org.apache.hugegraph.util.E;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class HGOptions implements Serializable {
+
+    private static final Logger LOG = LoggerFactory.getLogger(HGOptions.class);
+
+    private static final int CPUS = Runtime.getRuntime().availableProcessors();
+
+    /* Client Configs */
+    public static final String HOST = "host";
+    public static final String PORT = "port";
+    public static final String GRAPH = "graph";
+    public static final String PROTOCOL = "protocol";
+    public static final String USERNAME = "username";
+    public static final String TOKEN = "token";
+    public static final String TIMEOUT = "timeout";
+    public static final String MAX_CONNECTIONS = "max-conn";
+    public static final String MAX_CONNECTIONS_PER_ROUTE = "max-conn-per-route";
+    public static final String TRUST_STORE_FILE = "trust-store-file";
+    public static final String TRUST_STORE_TOKEN = "trust-store-token";
+
+    /* Graph Data Configs */
+    public static final String DATA_TYPE = "data-type";
+    public static final String LABEL = "label";
+    public static final String ID_FIELD = "id";
+    public static final String SOURCE_NAME = "source-name";
+    public static final String TARGET_NAME = "target-name";
+    public static final String SELECTED_FIELDS = "selected-fields";
+    public static final String IGNORED_FIELDS = "ignored-fields";
+    public static final String BATCH_SIZE = "batch-size";
+
+    /* Common Configs */
+    public static final String DELIMITER = "delimiter";
+
+    public Map<String, String> parameters;
+
+    public HGOptions(Map<String, String> options) {
+        parameters = options.entrySet()
+                            .stream()
+                            .collect(Collectors.toMap(e -> e.getKey().toLowerCase().trim(),
+                                                      Map.Entry::getValue));
+        checkRequiredConf();
+        setDefaultConf();
+        checkFieldsConflict();
+        LOG.info("HugeGraph Spark Connector Configs: {}", parameters);
+    }
+
+    private void checkRequiredConf() {
+        String dataType = parameters.getOrDefault(DATA_TYPE, null);
+        E.checkArgument(DataTypeEnum.validDataType(dataType),
+                        "DataType must be set, either vertex " +
+                        "or edge, but got %s.", dataType);
+
+        String label = parameters.getOrDefault(LABEL, null);
+        E.checkArgument(!StringUtils.isEmpty(label),
+                        "Label must be set, but got %s.", label);
+
+        if (DataTypeEnum.isEdge(dataType)) {
+            String sourceNames = parameters.getOrDefault(SOURCE_NAME, null);
+            E.checkArgument(!StringUtils.isEmpty(sourceNames), "source-names must be set " +
+                                                               "when datatype is edge, but got " +
+                                                               "%s.", sourceNames);
+            String targetNames = parameters.getOrDefault(TARGET_NAME, null);
+            E.checkArgument(!StringUtils.isEmpty(targetNames), "target-names must be set " +
+                                                               "when datatype is edge, but got " +
+                                                               "%s.", targetNames);
+            LOG.info("Edge, Label is {}, source is {}, target is {}.",
+                     label, sourceNames, targetNames);
+        } else {
+            String idField = parameters.getOrDefault(ID_FIELD, null);
+            if (Objects.nonNull(idField)) {
+                LOG.info("Vertex, Label is {}, id is {}, id strategy is {}", label, idField,
+                         "customize");
+            } else {
+                LOG.info("Vertex, Label is {}, id is {}, id strategy is {}", label, null,
+                         "PrimaryKey");
+            }
+        }
+    }
+
+    private void checkFieldsConflict() {
+        Set<String> selectSet = selectedFields();
+        Set<String> ignoreSet = ignoredFields();
+        E.checkArgument(selectSet.isEmpty() || ignoreSet.isEmpty(),
+                        "Not allowed to specify selected(%s) and ignored(%s) " +
+                        "fields at the same time, at least one of them " +
+                        "must be empty", selectSet, ignoreSet);
+    }
+
+    private void setDefaultConf() {
+        setDefaultValueWithMsg(HOST, Constants.DEFAULT_HOST,
+                               String.format("Host not set, use default host: %s instead.",
+                                             Constants.DEFAULT_HOST));
+        setDefaultValueWithMsg(PORT, String.valueOf(Constants.DEFAULT_PORT),
+                               String.format("Port not set, use default port: %s instead.",
+                                             Constants.DEFAULT_PORT));
+        setDefaultValueWithMsg(GRAPH, Constants.DEFAULT_GRAPH,
+                               String.format("Graph not set, use default graph: %s instead.",
+                                             Constants.DEFAULT_GRAPH));
+
+        setDefaultValue(PROTOCOL, Constants.DEFAULT_PROTOCOL);
+        setDefaultValue(USERNAME, null);
+        setDefaultValue(TOKEN, null);
+        setDefaultValue(TIMEOUT, String.valueOf(Constants.DEFAULT_TIMEOUT));
+        setDefaultValue(MAX_CONNECTIONS, String.valueOf(CPUS * 4));
+        setDefaultValue(MAX_CONNECTIONS_PER_ROUTE, String.valueOf(CPUS * 2));
+        setDefaultValue(TRUST_STORE_FILE, null);
+        setDefaultValue(TRUST_STORE_TOKEN, null);
+
+        setDefaultValue(BATCH_SIZE, String.valueOf(Constants.DEFAULT_BATCH_SIZE));
+        setDefaultValue(DELIMITER, Constants.COMMA_STR);
+        setDefaultValue(SELECTED_FIELDS, "");
+        setDefaultValue(IGNORED_FIELDS, "");
+    }
+
+    private void setDefaultValue(String key, String value) {
+        if (!parameters.containsKey(key)) {
+            parameters.put(key, value);
+        }
+    }
+
+    private void setDefaultValueWithMsg(String key, String value, String msg) {
+        if (!parameters.containsKey(key)) {
+            LOG.info(msg);
+            parameters.put(key, value);
+        }
+    }
+
+    public Map<String, String> getAllParameters() {
+        return parameters;
+    }
+
+    public String getConfValue(String confKey) {
+        String lowerConfKey = confKey.toLowerCase();
+        return parameters.getOrDefault(lowerConfKey, null);
+    }
+
+    public String host() {
+        return getConfValue(HOST);
+    }
+
+    public int port() {
+        return Integer.parseInt(getConfValue(PORT));
+    }
+
+    public String graph() {
+        return getConfValue(GRAPH);
+    }
+
+    public String protocol() {
+        return getConfValue(PROTOCOL);
+    }
+
+    public String username() {
+        return getConfValue(USERNAME);
+    }
+
+    public String token() {
+        return getConfValue(TOKEN);
+    }
+
+    public int timeout() {
+        return Integer.parseInt(getConfValue(TIMEOUT));
+    }
+
+    public int maxConnection() {
+        return Integer.parseInt(getConfValue(MAX_CONNECTIONS));
+    }
+
+    public int maxConnectionPerRoute() {
+        return Integer.parseInt(getConfValue(MAX_CONNECTIONS_PER_ROUTE));
+    }
+
+    public String trustStoreFile() {
+        return getConfValue(TRUST_STORE_FILE);
+    }
+
+    public String trustStoreToken() {
+        return getConfValue(TRUST_STORE_TOKEN);
+    }
+
+    public String dataType() {
+        return getConfValue(DATA_TYPE);
+    }
+
+    public String label() {
+        return getConfValue(LABEL);
+    }
+
+    public String idField() {
+        return getConfValue(ID_FIELD);
+    }
+
+    public List<String> sourceName() {
+        return splitStr(getConfValue(SOURCE_NAME));
+    }
+
+    public List<String> targetName() {
+        return splitStr(getConfValue(TARGET_NAME));
+    }
+
+    public Set<String> selectedFields() {
+        String selectStr = getConfValue(SELECTED_FIELDS);
+        if (StringUtils.isEmpty(selectStr)) {
+            return new HashSet<>();
+        }
+        return new HashSet<>(splitStr(selectStr, delimiter()));
+    }
+
+    public Set<String> ignoredFields() {
+        String ignoreStr = getConfValue(IGNORED_FIELDS);
+        if (StringUtils.isEmpty(ignoreStr)) {
+            return new HashSet<>();
+        }
+        return new HashSet<>(splitStr(ignoreStr, delimiter()));
+    }
+
+    public int batchSize() {
+        return Integer.parseInt(getConfValue(BATCH_SIZE));
+    }
+
+    public String delimiter() {
+        return getConfValue(DELIMITER);
+    }
+
+    private List<String> splitStr(String str) {
+        return splitStr(str, Constants.COMMA_STR);
+    }
+
+    private List<String> splitStr(String str, String delimiter) {
+        return Arrays.asList(str.split(delimiter));
+    }
+}
diff --git a/hugegraph-spark-connector/src/main/java/org/apache/hugegraph/spark/connector/utils/DataTypeUtils.java b/hugegraph-spark-connector/src/main/java/org/apache/hugegraph/spark/connector/utils/DataTypeUtils.java
new file mode 100644
index 00000000..317e7568
--- /dev/null
+++ b/hugegraph-spark-connector/src/main/java/org/apache/hugegraph/spark/connector/utils/DataTypeUtils.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.spark.connector.utils;
+
+import java.util.Date;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.hugegraph.spark.connector.constant.Constants;
+import org.apache.hugegraph.spark.connector.exception.LoadException;
+import org.apache.hugegraph.structure.constant.Cardinality;
+import org.apache.hugegraph.structure.constant.DataType;
+import org.apache.hugegraph.structure.schema.PropertyKey;
+import org.apache.hugegraph.util.E;
+
+import com.google.common.collect.ImmutableSet;
+
+public final class DataTypeUtils {
+
+    private static final Set<String> ACCEPTABLE_TRUE = ImmutableSet.of(
+            "true", "1", "yes", "y"
+    );
+
+    private static final Set<String> ACCEPTABLE_FALSE = ImmutableSet.of(
+            "false", "0", "no", "n"
+    );
+
+    public static Object convert(Object value, PropertyKey propertyKey) {
+        E.checkArgumentNotNull(value, "The value to be converted can't be null");
+
+        String key = propertyKey.name();
+        DataType dataType = propertyKey.dataType();
+        Cardinality cardinality = propertyKey.cardinality();
+        switch (cardinality) {
+            case SINGLE:
+                return parseSingleValue(key, value, dataType);
+            case SET:
+            case LIST:
+                throw new LoadException("Not support yet.");
+            default:
+                throw new AssertionError(String.format("Unsupported cardinality: '%s'",
+                                                       cardinality));
+        }
+    }
+
+    private static Object parseSingleValue(String key, Object rawValue, DataType dataType) {
+        // Trim space if raw value is string
+        Object value = rawValue;
+        if (rawValue instanceof String) {
+            value = ((String) rawValue).trim();
+        }
+        if (dataType.isNumber()) {
+            return parseNumber(key, value, dataType);
+        } else if (dataType.isBoolean()) {
+            return parseBoolean(key, value);
+        } else if (dataType.isDate()) {
+            String dateFormat = Constants.DATE_FORMAT;
+            String timeZone = Constants.TIME_ZONE;
+            return parseDate(key, value, dateFormat, timeZone);
+        } else if (dataType.isUUID()) {
+            return parseUUID(key, value);
+        } else if (dataType.isText()) {
+            if (!(rawValue instanceof String)) {
+                value = rawValue.toString();
+            }
+        }
+        E.checkArgument(checkDataType(key, value, dataType),
+                        "The value(key='%s') '%s'(%s) is not match with " +
+                        "data type %s and can't convert to it",
+                        key, value, value.getClass(), dataType);
+        return value;
+    }
+
+    public static long parseNumber(String key, Object rawValue) {
+        if (rawValue instanceof Number) {
+            return ((Number) rawValue).longValue();
+        } else if (rawValue instanceof String) {
+            // trim() is a little time-consuming
+            return parseLong(((String) rawValue).trim());
+        }
+        throw new IllegalArgumentException(String.format("The value(key='%s') must can be casted" +
+                                                         " to Long, but got '%s'(%s)", key,
+                                                         rawValue, rawValue.getClass().getName()));
+    }
+
+    public static UUID parseUUID(String key, Object rawValue) {
+        if (rawValue instanceof UUID) {
+            return (UUID) rawValue;
+        } else if (rawValue instanceof String) {
+            String value = ((String) rawValue).trim();
+            if (value.contains("-")) {
+                return UUID.fromString(value);
+            }
+            // UUID represented by hex string
+            E.checkArgument(value.length() == 32,
+                            "Invalid UUID value(key='%s') '%s'", key, value);
+            String high = value.substring(0, 16);
+            String low = value.substring(16);
+            return new UUID(Long.parseUnsignedLong(high, 16),
+                            Long.parseUnsignedLong(low, 16));
+        }
+        throw new IllegalArgumentException(String.format("Failed to convert value(key='%s') " +
+                                                         "'%s'(%s) to UUID", key, rawValue,
+                                                         rawValue.getClass()));
+    }
+
+    private static Boolean parseBoolean(String key, Object rawValue) {
+        if (rawValue instanceof Boolean) {
+            return (Boolean) rawValue;
+        }
+        if (rawValue instanceof String) {
+            String value = ((String) rawValue).toLowerCase();
+            if (ACCEPTABLE_TRUE.contains(value)) {
+                return true;
+            } else if (ACCEPTABLE_FALSE.contains(value)) {
+                return false;
+            } else {
+                throw new IllegalArgumentException(String.format(
+                        "Failed to convert '%s'(key='%s') to Boolean, " +
+                        "the acceptable boolean strings are %s or %s",
+                        key, rawValue, ACCEPTABLE_TRUE, ACCEPTABLE_FALSE));
+            }
+        }
+        throw new IllegalArgumentException(String.format("Failed to convert value(key='%s') " +
+                                                         "'%s'(%s) to Boolean", key, rawValue,
+                                                         rawValue.getClass()));
+    }
+
+    private static Number parseNumber(String key, Object value, DataType dataType) {
+        E.checkState(dataType.isNumber(), "The target data type must be number");
+
+        if (dataType.clazz().isInstance(value)) {
+            return (Number) value;
+        }
+        try {
+            switch (dataType) {
+                case BYTE:
+                    return Byte.valueOf(value.toString());
+                case INT:
+                    return Integer.valueOf(value.toString());
+                case LONG:
+                    return parseLong(value.toString());
+                case FLOAT:
+                    return Float.valueOf(value.toString());
+                case DOUBLE:
+                    return Double.valueOf(value.toString());
+                default:
+                    throw new AssertionError(String.format("Number type only contains Byte, " +
+                                                           "Integer, Long, Float, Double, " +
+                                                           "but got %s", dataType.clazz()));
+            }
+        } catch (NumberFormatException e) {
+            throw new IllegalArgumentException(String.format("Failed to convert value(key=%s) " +
+                                                             "'%s'(%s) to Number", key, value,
+                                                             value.getClass()), e);
+        }
+    }
+
+    private static long parseLong(String rawValue) {
+        if (rawValue.startsWith("-")) {
+            return Long.parseLong(rawValue);
+        } else {
+            return Long.parseUnsignedLong(rawValue);
+        }
+    }
+
+    private static Date parseDate(String key, Object value, String dateFormat, String timeZone) {
+        if (value instanceof Date) {
+            return (Date) value;
+        }
+        if (value instanceof Number) {
+            return new Date(((Number) value).longValue());
+        } else if (value instanceof String) {
+            if (Constants.TIMESTAMP.equals(dateFormat)) {
+                try {
+                    long timestamp = Long.parseLong((String) value);
+                    return new Date(timestamp);
+                } catch (NumberFormatException e) {
+                    throw new IllegalArgumentException(String.format("Invalid timestamp value " +
+                                                                     "'%s'", value));
+                }
+            } else {
+                return DateUtils.parse((String) value, dateFormat, timeZone);
+            }
+        }
+        throw new IllegalArgumentException(String.format("Failed to convert value(key='%s') " +
+                                                         "'%s'(%s) to Date", key, value,
+                                                         value.getClass()));
+    }
+
+    /**
+     * Check type of the value valid
+     */
+    private static boolean checkDataType(String key, Object value, DataType dataType) {
+        if (value instanceof Number && dataType.isNumber()) {
+            return parseNumber(key, value, dataType) != null;
+        }
+        return dataType.clazz().isInstance(value);
+    }
+}
diff --git a/hugegraph-spark-connector/src/main/java/org/apache/hugegraph/spark/connector/utils/DateUtils.java b/hugegraph-spark-connector/src/main/java/org/apache/hugegraph/spark/connector/utils/DateUtils.java
new file mode 100644
index 00000000..763762df
--- /dev/null
+++ b/hugegraph-spark-connector/src/main/java/org/apache/hugegraph/spark/connector/utils/DateUtils.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.spark.connector.utils;
+
+import java.util.Date;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hugegraph.date.SafeDateFormat;
+
+public final class DateUtils {
+
+    private static final Map<String, SafeDateFormat> DATE_FORMATS = new ConcurrentHashMap<>();
+
+    public static Date parse(String source, String df, String timeZone) {
+        SafeDateFormat dateFormat = getDateFormat(df);
+        // parse date with specified timezone
+        dateFormat.setTimeZone(timeZone);
+        return dateFormat.parse(source);
+    }
+
+    private static SafeDateFormat getDateFormat(String df) {
+        SafeDateFormat dateFormat = DATE_FORMATS.get(df);
+        if (dateFormat == null) {
+            dateFormat = new SafeDateFormat(df);
+            SafeDateFormat previous = DATE_FORMATS.putIfAbsent(df, dateFormat);
+            if (previous != null) {
+                dateFormat = previous;
+            }
+        }
+        return dateFormat;
+    }
+}
diff --git a/hugegraph-spark-connector/src/main/java/org/apache/hugegraph/spark/connector/utils/HGUtils.java b/hugegraph-spark-connector/src/main/java/org/apache/hugegraph/spark/connector/utils/HGUtils.java
new file mode 100644
index 00000000..93df3bde
--- /dev/null
+++ b/hugegraph-spark-connector/src/main/java/org/apache/hugegraph/spark/connector/utils/HGUtils.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.spark.connector.utils;
+
+import java.util.List;
+
+import org.apache.hugegraph.spark.connector.mapping.EdgeMapping;
+import org.apache.hugegraph.spark.connector.mapping.VertexMapping;
+import org.apache.hugegraph.spark.connector.options.HGOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class HGUtils {
+
+    private static final Logger LOG = LoggerFactory.getLogger(HGUtils.class);
+
+    public static VertexMapping vertexMappingFromConf(HGOptions hgOptions) {
+        String idField = hgOptions.idField();
+        VertexMapping vertexMapping = new VertexMapping(idField);
+
+        String label = hgOptions.label();
+        vertexMapping.label(label);
+        vertexMapping.batchSize(hgOptions.batchSize());
+        vertexMapping.selectedFields(hgOptions.selectedFields());
+        vertexMapping.ignoredFields(hgOptions.ignoredFields());
+        vertexMapping.check();
+
+        // TODO mappingFields, mappingValues, nullValues, updateStrategies
+        LOG.info("Update VertexMapping: {}", vertexMapping);
+        return vertexMapping;
+    }
+
+    public static EdgeMapping edgeMappingFromConf(HGOptions hgOptions) {
+        List<String> sourceNames = hgOptions.sourceName();
+        List<String> targetNames = hgOptions.targetName();
+        EdgeMapping edgeMapping = new EdgeMapping(sourceNames, targetNames);
+
+        String label = hgOptions.label();
+        edgeMapping.label(label);
+        edgeMapping.batchSize(hgOptions.batchSize());
+        edgeMapping.selectedFields(hgOptions.selectedFields());
+        edgeMapping.ignoredFields(hgOptions.ignoredFields());
+        edgeMapping.check();
+
+        // TODO mappingFields, mappingValues, nullValues, updateStrategies
+        LOG.info("Update EdgeMapping: {}", edgeMapping);
+        return edgeMapping;
+    }
+}
diff --git a/hugegraph-spark-connector/src/main/resources/log4j2.xml b/hugegraph-spark-connector/src/main/resources/log4j2.xml
new file mode 100644
index 00000000..b3a26404
--- /dev/null
+++ b/hugegraph-spark-connector/src/main/resources/log4j2.xml
@@ -0,0 +1,72 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with this
+  work for additional information regarding copyright ownership. The ASF
+  licenses this file to You under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+  WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+  License for the specific language governing permissions and limitations
+  under the License.
+  -->
+<configuration status="error">
+    <Properties>
+        <property name="log-charset">UTF-8</property>
+    </Properties>
+    <appenders>
+        <Console name="console" target="SYSTEM_OUT">
+            <ThresholdFilter level="INFO" onMatch="ACCEPT" onMismatch="DENY"/>
+            <PatternLayout charset="${log-charset}" pattern="%-d{yyyy-MM-dd HH:mm:ss} [%t] [%-5p] %c{1.} %x - %m%n" />
+        </Console>
+
+        <RollingRandomAccessFile name="file" fileName="logs/hugegraph-spark-connector.log"
+                                 filePattern="logs/hugegraph-spark-connector-%d{yyyy-MM-dd}-%i.log">
+            <ThresholdFilter level="INFO" onMatch="ACCEPT" onMismatch="DENY"/>
+            <PatternLayout charset="${log-charset}" pattern="%-d{yyyy-MM-dd HH:mm:ss} [%t] [%-5p] %c{1.} %x - %m%n" />
+            <!-- Trigger after exceeding 1day or 100MB -->
+            <Policies>
+                <SizeBasedTriggeringPolicy size="100MB"/>
+                <TimeBasedTriggeringPolicy interval="1" modulate="true" />
+            </Policies>
+            <!-- Keep max 5 files per day & auto delete after over 1GB or 100 files -->
+            <DefaultRolloverStrategy max="5">
+                <Delete basePath="logs" maxDepth="2">
+                    <IfFileName glob="*/*.log"/>
+                    <!-- Limit log amount & size -->
+                    <IfAny>
+                        <IfAccumulatedFileSize exceeds="1GB" />
+                        <IfAccumulatedFileCount exceeds="100" />
+                    </IfAny>
+                </Delete>
+            </DefaultRolloverStrategy>
+        </RollingRandomAccessFile>
+    </appenders>
+    <loggers>
+        <root level="INFO">
+            <appender-ref ref="console"/>
+            <appender-ref ref="file"/>
+        </root>
+        <logger name="org.apache.cassandra" level="WARN" additivity="false">
+            <appender-ref ref="file"/>
+        </logger>
+        <logger name="org.apache.hadoop" level="WARN" additivity="false">
+            <appender-ref ref="file"/>
+        </logger>
+        <logger name="io.netty" level="WARN" additivity="false">
+            <appender-ref ref="file"/>
+        </logger>
+        <logger name="org.apache.commons" level="WARN" additivity="false">
+            <appender-ref ref="file"/>
+        </logger>
+        <logger name="org.apache.hugegraph" level="INFO" additivity="false">
+            <appender-ref ref="file"/>
+            <appender-ref ref="console"/>
+        </logger>
+    </loggers>
+</configuration>
diff --git a/hugegraph-spark-connector/src/main/scala/org/apache/hugegraph/spark/connector/DataSource.scala b/hugegraph-spark-connector/src/main/scala/org/apache/hugegraph/spark/connector/DataSource.scala
new file mode 100644
index 00000000..1183b7c3
--- /dev/null
+++ b/hugegraph-spark-connector/src/main/scala/org/apache/hugegraph/spark/connector/DataSource.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.spark.connector
+
+import org.apache.hugegraph.spark.connector.options.HGOptions
+import org.apache.spark.sql.connector.catalog.{Table, TableProvider}
+import org.apache.spark.sql.connector.expressions.Transform
+import org.apache.spark.sql.sources.DataSourceRegister
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+import org.slf4j.LoggerFactory
+
+import java.util
+
+class DataSource extends TableProvider with DataSourceRegister {
+
+  private val LOG = LoggerFactory.getLogger(this.getClass)
+
+  private var schema: StructType = _
+
+  private var hgOptions: HGOptions = _
+
+  override def inferSchema(options: CaseInsensitiveStringMap): StructType = {
+    hgOptions = new HGOptions(options.asCaseSensitiveMap())
+    LOG.info(s"HugeGraph Options: ${hgOptions.getAllParameters}")
+    schema = new StructType()
+    LOG.info(s"Writer infer schema: ${schema}")
+    schema
+  }
+
+  override def getTable(schema: StructType, partitioning: Array[Transform],
+                        properties: util.Map[String, String]): Table = {
+    LOG.info(s"Get table schema: ${schema}")
+    new HGTable(schema, hgOptions)
+  }
+
+  //  override def supportsExternalMetadata(): Boolean = true
+
+  override def shortName(): String = "HugeGraph"
+}
diff --git a/hugegraph-spark-connector/src/main/scala/org/apache/hugegraph/spark/connector/HGTable.scala b/hugegraph-spark-connector/src/main/scala/org/apache/hugegraph/spark/connector/HGTable.scala
new file mode 100644
index 00000000..28c67951
--- /dev/null
+++ b/hugegraph-spark-connector/src/main/scala/org/apache/hugegraph/spark/connector/HGTable.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.spark.connector
+
+import org.apache.hugegraph.spark.connector.options.HGOptions
+import org.apache.hugegraph.spark.connector.writer.HGWriterBuilder
+import org.apache.spark.sql.connector.catalog.{SupportsWrite, TableCapability}
+import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
+import org.apache.spark.sql.types.StructType
+import org.slf4j.LoggerFactory
+
+import java.util
+import scala.collection.JavaConverters.setAsJavaSetConverter
+
+class HGTable(schema: StructType, hgOptions: HGOptions) extends SupportsWrite {
+
+  private val LOG = LoggerFactory.getLogger(this.getClass)
+
+  override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
+    LOG.info(s"User Config Options ${info.options().asCaseSensitiveMap()}")
+    LOG.info(s"Logical Write schema: ${info.schema()}")
+    new HGWriterBuilder(info.schema(), hgOptions)
+  }
+
+  override def name(): String = hgOptions.label()
+
+  override def schema(): StructType = schema
+
+  override def capabilities(): util.Set[TableCapability] =
+    Set(
+      TableCapability.BATCH_READ,
+      TableCapability.BATCH_WRITE,
+      TableCapability.ACCEPT_ANY_SCHEMA,
+      TableCapability.OVERWRITE_BY_FILTER,
+      TableCapability.OVERWRITE_DYNAMIC,
+      TableCapability.STREAMING_WRITE,
+      TableCapability.MICRO_BATCH_READ
+    ).asJava
+}
diff --git a/hugegraph-spark-connector/src/main/scala/org/apache/hugegraph/spark/connector/utils/HGBuildUtils.scala b/hugegraph-spark-connector/src/main/scala/org/apache/hugegraph/spark/connector/utils/HGBuildUtils.scala
new file mode 100644
index 00000000..bac8aa31
--- /dev/null
+++ b/hugegraph-spark-connector/src/main/scala/org/apache/hugegraph/spark/connector/utils/HGBuildUtils.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.spark.connector.utils
+
+import org.apache.hugegraph.spark.connector.builder.{EdgeBuilder, VertexBuilder}
+import org.apache.hugegraph.spark.connector.client.HGLoadContext
+import org.apache.hugegraph.structure.graph.{Edge, Vertex}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types.StructType
+import org.slf4j.LoggerFactory
+
+import scala.collection.JavaConverters.{collectionAsScalaIterableConverter, seqAsJavaListConverter}
+
+object HGBuildUtils {
+
+  private val LOG = LoggerFactory.getLogger(this.getClass)
+
+  def buildVertices(row: InternalRow, schema: StructType, builder: VertexBuilder): List[Vertex] = {
+    val fields = schema.names
+    val dataTypes = schema.fields.map(field => field.dataType)
+    val values = for {
+      idx <- schema.fields.indices
+    } yield {
+      val value = row.get(idx, dataTypes(idx))
+      if (value.getClass.getSimpleName.equalsIgnoreCase("UTF8String")) {
+        value.toString
+      }
+      else {
+        value
+      }
+    }
+    LOG.info(s"Fields: ${fields.mkString(", ")}, values: ${values.mkString(", ")}")
+    builder.build(fields, values.toArray).asScala.toList
+  }
+
+  def buildEdges(row: InternalRow, schema: StructType, builder: EdgeBuilder): List[Edge] = {
+    val fields = schema.names
+    val dataTypes = schema.fields.map(field => field.dataType)
+    val values = for {
+      idx <- schema.fields.indices
+    } yield {
+      val value = row.get(idx, dataTypes(idx))
+      if (value.getClass.getSimpleName.equalsIgnoreCase("UTF8String")) {
+        value.toString
+      }
+      else {
+        value
+      }
+    }
+    LOG.info(s"Fields: ${fields.mkString(", ")}, values: ${values.mkString(", ")}")
+    builder.build(fields, values.toArray).asScala.toList
+  }
+
+  def saveVertices(context: HGLoadContext, vertices: List[Vertex]): List[Vertex] = {
+    val successVertices = context.client().graph().addVertices(vertices.asJava).asScala.toList
+    successVertices
+  }
+
+  def saveEdges(context: HGLoadContext, edges: List[Edge]): List[Edge] = {
+    val successVertices = context.client().graph().addEdges(edges.asJava).asScala.toList
+    successVertices
+  }
+}
diff --git a/hugegraph-spark-connector/src/main/scala/org/apache/hugegraph/spark/connector/writer/HGBatchWriter.scala b/hugegraph-spark-connector/src/main/scala/org/apache/hugegraph/spark/connector/writer/HGBatchWriter.scala
new file mode 100644
index 00000000..7dd46ea1
--- /dev/null
+++ b/hugegraph-spark-connector/src/main/scala/org/apache/hugegraph/spark/connector/writer/HGBatchWriter.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.spark.connector.writer
+
+import org.apache.hugegraph.spark.connector.options.HGOptions
+import org.apache.spark.sql.connector.write.{BatchWrite, DataWriterFactory, PhysicalWriteInfo, WriterCommitMessage}
+import org.apache.spark.sql.types.StructType
+import org.slf4j.LoggerFactory
+
+class HGBatchWriter(schema: StructType, hgOptions: HGOptions) extends BatchWrite {
+
+  private val LOG = LoggerFactory.getLogger(this.getClass)
+
+  override def createBatchWriterFactory(info: PhysicalWriteInfo): DataWriterFactory = {
+    new HGBatchWriterFactory(schema, hgOptions)
+  }
+
+  override def commit(messages: Array[WriterCommitMessage]): Unit = {
+    for (elem <- messages) {
+      val msg = elem.asInstanceOf[HGCommitMessage]
+      LOG.info(s"Commit Message ${msg}")
+    }
+  }
+
+  override def abort(messages: Array[WriterCommitMessage]): Unit = {
+    LOG.error("HugeGraph BatchWriter abort.")
+  }
+}
diff --git a/hugegraph-spark-connector/src/main/scala/org/apache/hugegraph/spark/connector/writer/HGBatchWriterFactory.scala b/hugegraph-spark-connector/src/main/scala/org/apache/hugegraph/spark/connector/writer/HGBatchWriterFactory.scala
new file mode 100644
index 00000000..ef50fc77
--- /dev/null
+++ b/hugegraph-spark-connector/src/main/scala/org/apache/hugegraph/spark/connector/writer/HGBatchWriterFactory.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.spark.connector.writer
+
+import org.apache.hugegraph.spark.connector.constant.DataTypeEnum
+import org.apache.hugegraph.spark.connector.options.HGOptions
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.connector.write.{DataWriter, DataWriterFactory}
+import org.apache.spark.sql.types.StructType
+import org.slf4j.LoggerFactory
+
+class HGBatchWriterFactory(schema: StructType, hgOptions: HGOptions) extends DataWriterFactory {
+
+  private val LOG = LoggerFactory.getLogger(this.getClass)
+
+  override def createWriter(partitionId: Int, taskId: Long): DataWriter[InternalRow] = {
+    val dataType = hgOptions.dataType()
+    LOG.info(s"Create a ${dataType} writer, partitionId: ${partitionId}, taskId: ${taskId}")
+    if (dataType == "vertex") {
+      new HGVertexWriter(schema, hgOptions)
+    }
+    else {
+      new HGEdgeWriter(schema, hgOptions)
+    }
+  }
+}
diff --git a/hugegraph-spark-connector/src/main/scala/org/apache/hugegraph/spark/connector/writer/HGCommitMessage.scala b/hugegraph-spark-connector/src/main/scala/org/apache/hugegraph/spark/connector/writer/HGCommitMessage.scala
new file mode 100644
index 00000000..ab419c64
--- /dev/null
+++ b/hugegraph-spark-connector/src/main/scala/org/apache/hugegraph/spark/connector/writer/HGCommitMessage.scala
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.spark.connector.writer
+
+import org.apache.spark.sql.connector.write.WriterCommitMessage
+
+case class HGCommitMessage(successfulMsg: List[String]) extends WriterCommitMessage
diff --git a/hugegraph-spark-connector/src/main/scala/org/apache/hugegraph/spark/connector/writer/HGEdgeWriter.scala b/hugegraph-spark-connector/src/main/scala/org/apache/hugegraph/spark/connector/writer/HGEdgeWriter.scala
new file mode 100644
index 00000000..6f930ddd
--- /dev/null
+++ b/hugegraph-spark-connector/src/main/scala/org/apache/hugegraph/spark/connector/writer/HGEdgeWriter.scala
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.spark.connector.writer
+
+import org.apache.hugegraph.spark.connector.builder.EdgeBuilder
+import org.apache.hugegraph.spark.connector.client.HGLoadContext
+import org.apache.hugegraph.spark.connector.mapping.EdgeMapping
+import org.apache.hugegraph.spark.connector.options.HGOptions
+import org.apache.hugegraph.spark.connector.utils.HGUtils
+import org.apache.hugegraph.spark.connector.utils.HGBuildUtils
+import org.apache.hugegraph.structure.graph.Edge
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.connector.write.{DataWriter, WriterCommitMessage}
+import org.apache.spark.sql.types.StructType
+import org.slf4j.LoggerFactory
+
+import scala.collection.mutable.ListBuffer
+
+class HGEdgeWriter(schema: StructType, hgOptions: HGOptions) extends DataWriter[InternalRow] {
+
+  private val LOG = LoggerFactory.getLogger(this.getClass)
+
+  val context = new HGLoadContext(hgOptions)
+  context.updateSchemaCache()
+  context.setLoadingMode()
+
+  val mapping: EdgeMapping = HGUtils.edgeMappingFromConf(hgOptions)
+  val builder = new EdgeBuilder(context, mapping)
+
+  private var edgesBuffer: ListBuffer[Edge] = new ListBuffer()
+
+  var cnt = 0
+
+  override def write(record: InternalRow): Unit = {
+    val edges = HGBuildUtils.buildEdges(record, schema, builder)
+
+    for (edge <- edges) {
+      edgesBuffer.+=(edge)
+    }
+
+    if (edgesBuffer.size >= hgOptions.batchSize()) {
+      sinkOnce()
+    }
+  }
+
+  private def sinkOnce(): Unit = {
+    LOG.info(s"Writer once: ${edgesBuffer.toList}")
+    val successfulVertices = HGBuildUtils.saveEdges(context, edgesBuffer.toList)
+    val successIds = successfulVertices.map(v => v.id())
+    cnt += successIds.length
+    edgesBuffer.clear()
+  }
+
+  override def commit(): WriterCommitMessage = {
+    if (edgesBuffer.nonEmpty) {
+      sinkOnce()
+    }
+    context.unsetLoadingMode()
+    HGCommitMessage(List("Success cnt: " + cnt))
+  }
+
+  override def abort(): Unit = {
+    context.unsetLoadingMode()
+    LOG.error("Load Task abort.")
+  }
+
+  override def close(): Unit = {
+    context.close()
+  }
+}
diff --git a/hugegraph-spark-connector/src/main/scala/org/apache/hugegraph/spark/connector/writer/HGVertexWriter.scala b/hugegraph-spark-connector/src/main/scala/org/apache/hugegraph/spark/connector/writer/HGVertexWriter.scala
new file mode 100644
index 00000000..e5823eb3
--- /dev/null
+++ b/hugegraph-spark-connector/src/main/scala/org/apache/hugegraph/spark/connector/writer/HGVertexWriter.scala
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.spark.connector.writer
+
+import org.apache.hugegraph.spark.connector.builder.VertexBuilder
+import org.apache.hugegraph.spark.connector.client.HGLoadContext
+import org.apache.hugegraph.spark.connector.mapping.VertexMapping
+import org.apache.hugegraph.spark.connector.options.HGOptions
+import org.apache.hugegraph.spark.connector.utils.HGUtils
+import org.apache.hugegraph.spark.connector.utils.HGBuildUtils
+import org.apache.hugegraph.structure.graph.Vertex
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.connector.write.{DataWriter, WriterCommitMessage}
+import org.apache.spark.sql.types.StructType
+import org.slf4j.LoggerFactory
+
+import scala.collection.mutable.ListBuffer
+
+class HGVertexWriter(schema: StructType, hgOptions: HGOptions) extends DataWriter[InternalRow] {
+
+  private val LOG = LoggerFactory.getLogger(this.getClass)
+
+  val context = new HGLoadContext(hgOptions)
+  context.updateSchemaCache()
+  context.setLoadingMode()
+
+  val mapping: VertexMapping = HGUtils.vertexMappingFromConf(hgOptions)
+  val builder = new VertexBuilder(context, mapping)
+
+  private var verticesBuffer: ListBuffer[Vertex] = new ListBuffer()
+
+  var cnt = 0
+
+  override def write(record: InternalRow): Unit = {
+    val vertices = HGBuildUtils.buildVertices(record, schema, builder)
+
+    for (vertex <- vertices) {
+      verticesBuffer.+=(vertex)
+    }
+
+    if (verticesBuffer.size >= hgOptions.batchSize()) {
+      sinkOnce()
+    }
+  }
+
+  private def sinkOnce(): Unit = {
+    verticesBuffer.foreach(e => e.id())
+    LOG.info(s"Writer once: ${verticesBuffer.toList}")
+    val successfulVertices = HGBuildUtils.saveVertices(context, verticesBuffer.toList)
+    val successIds = successfulVertices.map(_.id()).mkString(",")
+    cnt += successfulVertices.length
+    verticesBuffer.clear()
+  }
+
+  override def commit(): WriterCommitMessage = {
+    if (verticesBuffer.nonEmpty) {
+      sinkOnce()
+    }
+    context.unsetLoadingMode()
+    HGCommitMessage(List("Success cnt: " + cnt))
+  }
+
+  override def abort(): Unit = {
+    context.unsetLoadingMode()
+    LOG.error("Load Task abort.")
+  }
+
+  override def close(): Unit = {
+    context.close()
+  }
+}
diff --git a/hugegraph-spark-connector/src/main/scala/org/apache/hugegraph/spark/connector/writer/HGWriterBuilder.scala b/hugegraph-spark-connector/src/main/scala/org/apache/hugegraph/spark/connector/writer/HGWriterBuilder.scala
new file mode 100644
index 00000000..ac770b1c
--- /dev/null
+++ b/hugegraph-spark-connector/src/main/scala/org/apache/hugegraph/spark/connector/writer/HGWriterBuilder.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.spark.connector.writer
+
+import org.apache.hugegraph.spark.connector.options.HGOptions
+import org.apache.spark.sql.connector.write.{BatchWrite, SupportsOverwrite, SupportsTruncate, WriteBuilder}
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.StructType
+import org.slf4j.LoggerFactory
+
+class HGWriterBuilder(schema: StructType, hgOptions: HGOptions)
+  extends WriteBuilder with SupportsOverwrite with SupportsTruncate {
+
+  private val LOG = LoggerFactory.getLogger(this.getClass)
+
+  override def buildForBatch(): BatchWrite = {
+    new HGBatchWriter(schema, hgOptions)
+  }
+
+  override def overwrite(filters: Array[Filter]): WriteBuilder = {
+    new HGWriterBuilder(schema, hgOptions)
+  }
+}
diff --git a/hugegraph-spark-connector/src/test/java/org/apache/hugegraph/spark/connector/builder/EdgeBuilderTest.java b/hugegraph-spark-connector/src/test/java/org/apache/hugegraph/spark/connector/builder/EdgeBuilderTest.java
new file mode 100644
index 00000000..c6e05c4a
--- /dev/null
+++ b/hugegraph-spark-connector/src/test/java/org/apache/hugegraph/spark/connector/builder/EdgeBuilderTest.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.spark.connector.builder;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hugegraph.driver.HugeClient;
+import org.apache.hugegraph.spark.connector.client.HGLoadContext;
+import org.apache.hugegraph.spark.connector.mapping.EdgeMapping;
+import org.apache.hugegraph.spark.connector.options.HGOptions;
+import org.apache.hugegraph.spark.connector.utils.HGEnvUtils;
+import org.apache.hugegraph.structure.graph.Edge;
+import org.apache.hugegraph.structure.schema.EdgeLabel;
+import org.apache.hugegraph.testutil.Assert;
+import org.jetbrains.annotations.NotNull;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class EdgeBuilderTest {
+
+    private static HugeClient client;
+
+    @BeforeClass
+    public static void setUp() {
+        HGEnvUtils.createEnv();
+        client = HGEnvUtils.getHugeClient();
+    }
+
+    @Test
+    public void testEdgeIdFields() {
+        HGLoadContext context = getEdgeLoadContext();
+
+        EdgeMapping edgeMapping = new EdgeMapping(Collections.singletonList("v1-name"),
+                                                  Collections.singletonList("v2-name"));
+        edgeMapping.label("created");
+        EdgeBuilder edgeBuilder = new EdgeBuilder(context, edgeMapping);
+
+        EdgeLabel edgeLabel = (EdgeLabel) edgeBuilder.schemaLabel();
+        Assert.assertEquals("person", edgeLabel.sourceLabel());
+        Assert.assertEquals("software", edgeLabel.targetLabel());
+        Assert.assertTrue(edgeBuilder.isIdField("v1-name"));
+        Assert.assertTrue(edgeBuilder.isIdField("v2-name"));
+        Assert.assertEquals("edge-mapping(label=created)",
+                            edgeBuilder.mapping().toString());
+    }
+
+    @Test
+    public void testEdgeBuild() {
+        HGLoadContext context = getEdgeLoadContext();
+
+        EdgeMapping edgeMapping = new EdgeMapping(Collections.singletonList("v1-name"),
+                                                  Collections.singletonList("name"));
+        edgeMapping.label("created");
+        EdgeBuilder edgeBuilder = new EdgeBuilder(context, edgeMapping);
+
+        String[] names = {"v1-name", "name", "date", "weight"};
+        Object[] values = {"josh", "lop", "2009-11-11", 0.4};
+
+        List<Edge> edges = edgeBuilder.build(names, values);
+        Assert.assertEquals(1, edges.size());
+        Edge edge = edges.get(0);
+        Assert.assertEquals("josh", edge.sourceId());
+        Assert.assertEquals("2:lop", edge.targetId());
+    }
+
+    @NotNull
+    private static HGLoadContext getEdgeLoadContext() {
+        Map<String, String> configs = new HashMap<>();
+        configs.put("host", HGEnvUtils.DEFAULT_HOST);
+        configs.put("port", HGEnvUtils.DEFAULT_PORT);
+
+        configs.put("data-type", "edge");
+        configs.put("label", "created");
+        configs.put("source-name", "v1-name");
+        configs.put("target-name", "name");
+        HGOptions options = new HGOptions(configs);
+        HGLoadContext context = new HGLoadContext(options);
+        context.updateSchemaCache();
+        return context;
+    }
+
+    @AfterClass
+    public static void tearDown() {
+        HGEnvUtils.destroyEnv();
+    }
+}
diff --git a/hugegraph-spark-connector/src/test/java/org/apache/hugegraph/spark/connector/builder/VertexBuilderTest.java b/hugegraph-spark-connector/src/test/java/org/apache/hugegraph/spark/connector/builder/VertexBuilderTest.java
new file mode 100644
index 00000000..559bb031
--- /dev/null
+++ b/hugegraph-spark-connector/src/test/java/org/apache/hugegraph/spark/connector/builder/VertexBuilderTest.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.spark.connector.builder;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hugegraph.driver.HugeClient;
+import org.apache.hugegraph.spark.connector.client.HGLoadContext;
+import org.apache.hugegraph.spark.connector.mapping.VertexMapping;
+import org.apache.hugegraph.spark.connector.options.HGOptions;
+import org.apache.hugegraph.spark.connector.utils.HGEnvUtils;
+import org.apache.hugegraph.structure.graph.Vertex;
+import org.apache.hugegraph.structure.schema.VertexLabel;
+import org.apache.hugegraph.testutil.Assert;
+import org.jetbrains.annotations.NotNull;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class VertexBuilderTest {
+
+    private static HugeClient client;
+
+    @BeforeClass
+    public static void setUp() {
+        HGEnvUtils.createEnv();
+        client = HGEnvUtils.getHugeClient();
+    }
+
+    @Test
+    public void testCustomizeIdFields() {
+        HGLoadContext context = getCustomizeIdVertexContext();
+
+        VertexMapping vertexMapping = new VertexMapping("name");
+        vertexMapping.label("person");
+        VertexBuilder vertexBuilder = new VertexBuilder(context, vertexMapping);
+
+        VertexLabel vertexLabel = (VertexLabel) vertexBuilder.schemaLabel();
+        Assert.assertTrue(vertexLabel.idStrategy().isCustomizeString());
+
+        Assert.assertTrue(vertexBuilder.isIdField("name"));
+        System.out.println(vertexBuilder.mapping());
+        Assert.assertEquals("vertex-mapping(label=person)",
+                            vertexBuilder.mapping().toString());
+    }
+
+    @Test
+    public void testCustomizeWithNullIdField() {
+        HGLoadContext context = getCustomizeIdVertexContext();
+
+        VertexMapping vertexMapping = new VertexMapping(null);
+        vertexMapping.label("person");
+        Assert.assertThrows(IllegalStateException.class, () -> {
+            new VertexBuilder(context, vertexMapping);
+        });
+    }
+
+    @NotNull
+    private static HGLoadContext getCustomizeIdVertexContext() {
+        Map<String, String> configs = new HashMap<>();
+        configs.put("host", HGEnvUtils.DEFAULT_HOST);
+        configs.put("port", HGEnvUtils.DEFAULT_PORT);
+
+        configs.put("data-type", "vertex");
+        configs.put("label", "person");
+        configs.put("id", "name");
+        HGOptions options = new HGOptions(configs);
+        HGLoadContext context = new HGLoadContext(options);
+        context.updateSchemaCache();
+        return context;
+    }
+
+    @Test
+    public void testCustomizeBuildVertex() {
+        HGLoadContext context = getCustomizeIdVertexContext();
+
+        VertexMapping vertexMapping = new VertexMapping("name");
+        vertexMapping.label("person");
+        VertexBuilder vertexBuilder = new VertexBuilder(context, vertexMapping);
+
+        String[] names = {"name", "age", "city"};
+        Object[] values = {"marko", "29", "Beijing"};
+
+        List<Vertex> vertices = vertexBuilder.build(names, values);
+        Assert.assertEquals(1, vertices.size());
+        Vertex vertex = vertices.get(0);
+        Assert.assertEquals("marko", vertex.id());
+        Assert.assertEquals("person", vertex.label());
+        Map<String, Object> properties = vertex.properties();
+        for (int i = 0; i < names.length; i++) {
+            String name = names[i];
+            Assert.assertTrue(properties.containsKey(name));
+            Assert.assertEquals(values[i], String.valueOf(properties.get(name)));
+        }
+    }
+
+    @Test
+    public void testPrimaryIdFields() {
+        HGLoadContext context = getPrimaryIdVertexContext();
+        VertexMapping vertexMapping = new VertexMapping("name");
+        vertexMapping.label("software");
+        Assert.assertThrows(IllegalStateException.class, () -> {
+            new VertexBuilder(context, vertexMapping);
+        });
+    }
+
+    @Test
+    public void testPrimaryWithNullIdField() {
+        HGLoadContext context = getPrimaryIdVertexContext();
+        VertexMapping vertexMapping = new VertexMapping(null);
+        vertexMapping.label("software");
+        VertexBuilder vertexBuilder = new VertexBuilder(context, vertexMapping);
+        VertexLabel vertexLabel = (VertexLabel) vertexBuilder.schemaLabel();
+        Assert.assertTrue(vertexLabel.idStrategy().isPrimaryKey());
+        Assert.assertEquals("vertex-mapping(label=software)",
+                            vertexBuilder.mapping().toString());
+    }
+
+    @NotNull
+    private static HGLoadContext getPrimaryIdVertexContext() {
+        Map<String, String> configs = new HashMap<>();
+        configs.put("host", HGEnvUtils.DEFAULT_HOST);
+        configs.put("port", HGEnvUtils.DEFAULT_PORT);
+
+        configs.put("data-type", "vertex");
+        configs.put("label", "software");
+        HGOptions options = new HGOptions(configs);
+        HGLoadContext context = new HGLoadContext(options);
+        context.updateSchemaCache();
+        return context;
+    }
+
+    @Test
+    public void testPrimaryBuildVertex() {
+        HGLoadContext context = getCustomizeIdVertexContext();
+
+        VertexMapping vertexMapping = new VertexMapping(null);
+        vertexMapping.label("software");
+        VertexBuilder vertexBuilder = new VertexBuilder(context, vertexMapping);
+
+        String[] names = {"name", "lang", "price"};
+        Object[] values = {"lop", "java", "328.0"};
+        List<Vertex> vertices = vertexBuilder.build(names, values);
+        Assert.assertEquals(1, vertices.size());
+        Vertex vertex = vertices.get(0);
+        Assert.assertNull(vertex.id());
+        Assert.assertEquals("software", vertex.label());
+        Map<String, Object> properties = vertex.properties();
+        for (int i = 0; i < names.length; i++) {
+            String name = names[i];
+            Assert.assertTrue(properties.containsKey(name));
+            Assert.assertEquals(values[i], String.valueOf(properties.get(name)));
+        }
+    }
+
+    @AfterClass
+    public static void tearDown() {
+        HGEnvUtils.destroyEnv();
+    }
+}
diff --git a/hugegraph-spark-connector/src/test/java/org/apache/hugegraph/spark/connector/client/HGClientHolderTest.java b/hugegraph-spark-connector/src/test/java/org/apache/hugegraph/spark/connector/client/HGClientHolderTest.java
new file mode 100644
index 00000000..c0009418
--- /dev/null
+++ b/hugegraph-spark-connector/src/test/java/org/apache/hugegraph/spark/connector/client/HGClientHolderTest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.spark.connector.client;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hugegraph.driver.HugeClient;
+import org.apache.hugegraph.spark.connector.options.HGOptions;
+import org.apache.hugegraph.spark.connector.utils.HGEnvUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class HGClientHolderTest {
+
+    @Test
+    public void testHGClientHolder() {
+        Map<String, String> configs = new HashMap<>();
+        configs.put("host", HGEnvUtils.DEFAULT_HOST);
+        configs.put("port", HGEnvUtils.DEFAULT_PORT);
+
+        configs.put("data-type", "vertex");
+        configs.put("label", "person");
+        configs.put("id", "name");
+        HGOptions options = new HGOptions(configs);
+        HugeClient client = HGClientHolder.create(options);
+        Assert.assertNotNull(client);
+        client.close();
+    }
+}
diff --git a/hugegraph-spark-connector/src/test/java/org/apache/hugegraph/spark/connector/mapping/EdgeMappingTest.java b/hugegraph-spark-connector/src/test/java/org/apache/hugegraph/spark/connector/mapping/EdgeMappingTest.java
new file mode 100644
index 00000000..cf141058
--- /dev/null
+++ b/hugegraph-spark-connector/src/test/java/org/apache/hugegraph/spark/connector/mapping/EdgeMappingTest.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.spark.connector.mapping;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hugegraph.testutil.Assert;
+import org.junit.Test;
+
+public class EdgeMappingTest {
+
+    @Test
+    public void testBasicEdgeMapping() {
+        List<String> sourceFields = Arrays.asList("id", "name", "age");
+        List<String> targetFields = Arrays.asList("id", "ISBN", "price");
+        EdgeMapping edgeMapping = new EdgeMapping(sourceFields, targetFields);
+        edgeMapping.label("buy");
+
+        Assert.assertArrayEquals(new String[]{"id", "name", "age"}, sourceFields.toArray());
+        Assert.assertArrayEquals(new String[]{"id", "ISBN", "price"}, targetFields.toArray());
+
+        Assert.assertEquals("edge", edgeMapping.type().dataType());
+        Assert.assertEquals("edge-mapping(label=buy)", edgeMapping.toString());
+    }
+
+    @Test
+    public void testEdgeMappingSourceIsNull() {
+        List<String> targetFields = Arrays.asList("id", "ISBN", "price");
+        EdgeMapping edgeMapping = new EdgeMapping(null, targetFields);
+        edgeMapping.label("buy");
+        testEdgeMappingException(edgeMapping);
+    }
+
+    @Test
+    public void testEdgeMappingSourceIsEmpty() {
+        List<String> sourceFields = new ArrayList<>();
+        List<String> targetFields = Arrays.asList("id", "ISBN", "price");
+        EdgeMapping edgeMapping = new EdgeMapping(sourceFields, targetFields);
+        edgeMapping.label("buy");
+        testEdgeMappingException(edgeMapping);
+    }
+
+    @Test
+    public void testEdgeMappingTargetIsNull() {
+        List<String> sourceFields = Arrays.asList("id", "name", "age");
+        EdgeMapping edgeMapping = new EdgeMapping(sourceFields, null);
+        edgeMapping.label("buy");
+        testEdgeMappingException(edgeMapping);
+    }
+
+    @Test
+    public void testEdgeMappingTargetIsEmpty() {
+        List<String> sourceFields = Arrays.asList("id", "name", "age");
+        List<String> targetFields = new ArrayList<>();
+        EdgeMapping edgeMapping = new EdgeMapping(sourceFields, targetFields);
+        edgeMapping.label("buy");
+        testEdgeMappingException(edgeMapping);
+    }
+
+    private void testEdgeMappingException(EdgeMapping edgeMapping) {
+        Assert.assertThrows(IllegalArgumentException.class, edgeMapping::check);
+    }
+
+    @Test
+    public void testAdvancedEdgeMapping() {
+        List<String> sourceFields = Arrays.asList("id", "name", "age");
+        List<String> targetFields = Arrays.asList("id", "ISBN", "price");
+        EdgeMapping edgeMapping = new EdgeMapping(sourceFields, targetFields);
+        edgeMapping.label("buy");
+        edgeMapping.batchSize(100);
+        Assert.assertEquals(100, edgeMapping.batchSize());
+
+        edgeMapping.selectedFields(new HashSet<>(Arrays.asList("id", "name", "lang", "price")));
+        edgeMapping.ignoredFields(new HashSet<>(Arrays.asList("ISBN", "age")));
+
+        String[] selectFields = edgeMapping.selectedFields().toArray(new String[0]);
+        Assert.assertArrayEquals(new String[]{"price", "name", "id", "lang"}, selectFields);
+        String[] ignoredFields = edgeMapping.ignoredFields().toArray(new String[0]);
+        Assert.assertArrayEquals(new String[]{"ISBN", "age"}, ignoredFields);
+
+        HashMap<String, String> fieldMapping = new HashMap<>();
+        fieldMapping.put("source_name", "name");
+        fieldMapping.put("target_name", "name");
+        edgeMapping.mappingFields(fieldMapping);
+    }
+
+    @Test
+    public void testMappingFieldValueNotExists() {
+        List<String> sourceFields = Arrays.asList("id", "name", "age");
+        List<String> targetFields = Arrays.asList("id", "ISBN", "price");
+        EdgeMapping edgeMapping = new EdgeMapping(sourceFields, targetFields);
+        edgeMapping.label("buy");
+
+        HashMap<String, String> fieldMapping = new HashMap<>();
+        fieldMapping.put("source_name", "name");
+        edgeMapping.mappingFields(fieldMapping);
+
+        Assert.assertEquals("new_source_name", edgeMapping.mappingField("new_source_name"));
+    }
+
+    @Test
+    public void testMappingFieldValueIsNull() {
+        List<String> sourceFields = Arrays.asList("id", "name", "age");
+        List<String> targetFields = Arrays.asList("id", "ISBN", "price");
+        EdgeMapping edgeMapping = new EdgeMapping(sourceFields, targetFields);
+        edgeMapping.label("buy");
+
+        HashMap<String, String> fieldMapping = new HashMap<>();
+        // The value in field_mapping can't be null
+        fieldMapping.put("source_name", null);
+        edgeMapping.mappingFields(fieldMapping);
+
+        Assert.assertThrows(IllegalArgumentException.class, edgeMapping::check);
+    }
+
+    @Test
+    public void testMappingValue() {
+        List<String> sourceFields = Arrays.asList("id", "name", "age");
+        List<String> targetFields = Arrays.asList("id", "ISBN", "price");
+        EdgeMapping edgeMapping = new EdgeMapping(sourceFields, targetFields);
+        edgeMapping.label("buy");
+
+        Map<String, Object> cityMap = new HashMap<>();
+        cityMap.put("1", "Beijing");
+        cityMap.put("2", "Shanghai");
+        Map<String, Object> ageMap = new HashMap<>();
+        ageMap.put("young", 18);
+        ageMap.put("middle", 35);
+
+        Map<String, Map<String, Object>> valueMapping = new HashMap<>();
+        valueMapping.put("city", cityMap);
+        valueMapping.put("age", ageMap);
+
+        edgeMapping.mappingValues(valueMapping);
+
+        Assert.assertEquals("Beijing", edgeMapping.mappingValue("city", "1"));
+        Assert.assertEquals("Shanghai", edgeMapping.mappingValue("city", "2"));
+        Assert.assertEquals(18, edgeMapping.mappingValue("age", "young"));
+        Assert.assertEquals(35, edgeMapping.mappingValue("age", "middle"));
+
+        // not exist raw value
+        Assert.assertEquals("Wuhan", edgeMapping.mappingValue("city", "Wuhan"));
+        Assert.assertEquals("old", edgeMapping.mappingValue("age", "old"));
+    }
+
+    @Test
+    public void testMappingValueIsNull() {
+        List<String> sourceFields = Arrays.asList("id", "name", "age");
+        List<String> targetFields = Arrays.asList("id", "ISBN", "price");
+        EdgeMapping edgeMapping = new EdgeMapping(sourceFields, targetFields);
+        edgeMapping.label("buy");
+
+        Map<String, Object> cityMap = new HashMap<>();
+        // The value in value_mapping can't be null
+        cityMap.put("1", null);
+        cityMap.put("2", "Shanghai");
+
+        Map<String, Map<String, Object>> valueMapping = new HashMap<>();
+        valueMapping.put("city", cityMap);
+        edgeMapping.mappingValues(valueMapping);
+
+        Assert.assertThrows(IllegalArgumentException.class, edgeMapping::check);
+    }
+}
diff --git a/hugegraph-spark-connector/src/test/java/org/apache/hugegraph/spark/connector/mapping/VertexMappingTest.java b/hugegraph-spark-connector/src/test/java/org/apache/hugegraph/spark/connector/mapping/VertexMappingTest.java
new file mode 100644
index 00000000..3f7b2d64
--- /dev/null
+++ b/hugegraph-spark-connector/src/test/java/org/apache/hugegraph/spark/connector/mapping/VertexMappingTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.spark.connector.mapping;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+
+import org.apache.hugegraph.testutil.Assert;
+import org.junit.Test;
+
+public class VertexMappingTest {
+
+    @Test
+    public void testBasicVertexMapping() {
+        VertexMapping vertexMapping = new VertexMapping("unique-id");
+        vertexMapping.label("person");
+
+        vertexMapping.check();
+
+        Assert.assertEquals("vertex", vertexMapping.type().dataType());
+        Assert.assertEquals("unique-id", vertexMapping.idField());
+        Assert.assertEquals("person", vertexMapping.label());
+        Assert.assertEquals("vertex-mapping(label=person)", vertexMapping.toString());
+    }
+
+    @Test
+    public void testAdvancedVertexMapping() {
+        VertexMapping vertexMapping = new VertexMapping("unique-id");
+        vertexMapping.label("person");
+        vertexMapping.batchSize(100);
+        Assert.assertEquals(100, vertexMapping.batchSize());
+
+        vertexMapping.selectedFields(new HashSet<>(Arrays.asList("id", "name", "lang", "price")));
+        vertexMapping.ignoredFields(new HashSet<>(Arrays.asList("ISBN", "age")));
+
+        String[] selectFields = vertexMapping.selectedFields().toArray(new String[0]);
+        Assert.assertArrayEquals(new String[]{"price", "name", "id", "lang"}, selectFields);
+        String[] ignoredFields = vertexMapping.ignoredFields().toArray(new String[0]);
+        Assert.assertArrayEquals(new String[]{"ISBN", "age"}, ignoredFields);
+
+        HashMap<String, String> fieldMapping = new HashMap<>();
+        fieldMapping.put("source_name", "name");
+        fieldMapping.put("target_name", "name");
+        vertexMapping.mappingFields(fieldMapping);
+    }
+
+    @Test
+    public void testMappingFieldValueNotExists() {
+        VertexMapping vertexMapping = new VertexMapping("unique-id");
+        vertexMapping.label("person");
+
+        HashMap<String, String> fieldMapping = new HashMap<>();
+        fieldMapping.put("source_name", "name");
+        vertexMapping.mappingFields(fieldMapping);
+
+        Assert.assertEquals("new_source_name", vertexMapping.mappingField("new_source_name"));
+    }
+
+    @Test
+    public void testMappingFieldValueIsNull() {
+        VertexMapping vertexMapping = new VertexMapping("unique-id");
+        vertexMapping.label("person");
+
+        HashMap<String, String> fieldMapping = new HashMap<>();
+        // The value in field_mapping can't be null
+        fieldMapping.put("source_name", null);
+        vertexMapping.mappingFields(fieldMapping);
+
+        Assert.assertThrows(IllegalArgumentException.class, vertexMapping::check);
+    }
+
+    @Test
+    public void testMappingValue() {
+        VertexMapping vertexMapping = new VertexMapping("unique-id");
+        vertexMapping.label("person");
+
+        Map<String, Object> cityMap = new HashMap<>();
+        cityMap.put("1", "Beijing");
+        cityMap.put("2", "Shanghai");
+        Map<String, Object> ageMap = new HashMap<>();
+        ageMap.put("young", 18);
+        ageMap.put("middle", 35);
+
+        Map<String, Map<String, Object>> valueMapping = new HashMap<>();
+        valueMapping.put("city", cityMap);
+        valueMapping.put("age", ageMap);
+
+        vertexMapping.mappingValues(valueMapping);
+
+        Assert.assertEquals("Beijing", vertexMapping.mappingValue("city", "1"));
+        Assert.assertEquals("Shanghai", vertexMapping.mappingValue("city", "2"));
+        Assert.assertEquals(18, vertexMapping.mappingValue("age", "young"));
+        Assert.assertEquals(35, vertexMapping.mappingValue("age", "middle"));
+
+        // not exist raw value
+        Assert.assertEquals("Wuhan", vertexMapping.mappingValue("city", "Wuhan"));
+        Assert.assertEquals("old", vertexMapping.mappingValue("age", "old"));
+    }
+
+    @Test
+    public void testMappingValueIsNull() {
+        VertexMapping vertexMapping = new VertexMapping("unique-id");
+        vertexMapping.label("person");
+
+        Map<String, Object> cityMap = new HashMap<>();
+        // The value in value_mapping can't be null
+        cityMap.put("1", null);
+        cityMap.put("2", "Shanghai");
+
+        Map<String, Map<String, Object>> valueMapping = new HashMap<>();
+        valueMapping.put("city", cityMap);
+        vertexMapping.mappingValues(valueMapping);
+
+        Assert.assertThrows(IllegalArgumentException.class, vertexMapping::check);
+    }
+}
diff --git a/hugegraph-spark-connector/src/test/java/org/apache/hugegraph/spark/connector/options/HGOptionsTest.java b/hugegraph-spark-connector/src/test/java/org/apache/hugegraph/spark/connector/options/HGOptionsTest.java
new file mode 100644
index 00000000..03c10499
--- /dev/null
+++ b/hugegraph-spark-connector/src/test/java/org/apache/hugegraph/spark/connector/options/HGOptionsTest.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.spark.connector.options;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hugegraph.testutil.Assert;
+import org.junit.Test;
+
+public class HGOptionsTest {
+
+    private static final int CPUS = Runtime.getRuntime().availableProcessors();
+
+    @Test
+    public void testOptionDefaultValue() {
+        Map<String, String> requiredConfigs = new HashMap<>();
+        requiredConfigs.put("data-type", "vertex");
+        requiredConfigs.put("label", "person");
+        requiredConfigs.put("id", "name");
+
+        HGOptions hgOptions = new HGOptions(requiredConfigs);
+        Assert.assertEquals("localhost", hgOptions.host());
+        Assert.assertEquals(8080, hgOptions.port());
+        Assert.assertEquals("hugegraph", hgOptions.graph());
+        Assert.assertEquals("http", hgOptions.protocol());
+        Assert.assertNull(hgOptions.username());
+        Assert.assertNull(hgOptions.token());
+        Assert.assertEquals(60, hgOptions.timeout());
+        Assert.assertEquals(CPUS * 4, hgOptions.maxConnection());
+        Assert.assertEquals(CPUS * 2, hgOptions.maxConnectionPerRoute());
+        Assert.assertNull(hgOptions.trustStoreFile());
+        Assert.assertNull(hgOptions.trustStoreToken());
+
+        Assert.assertEquals(500, hgOptions.batchSize());
+        Assert.assertEquals(",", hgOptions.delimiter());
+        Assert.assertEquals(0, hgOptions.selectedFields().size());
+        Assert.assertEquals(0, hgOptions.ignoredFields().size());
+    }
+
+    @Test
+    public void testMissingDataType() {
+        Map<String, String> configs = new HashMap<>();
+        configs.put("label", "person");
+        configs.put("id", "name");
+        testMissingRequiredOption(configs);
+    }
+
+    @Test
+    public void testMissingLabel() {
+        Map<String, String> configs = new HashMap<>();
+        configs.put("data-type", "vertex");
+        configs.put("id", "name");
+        testMissingRequiredOption(configs);
+    }
+
+    @Test
+    public void testEdgeMissingSourceNames() {
+        Map<String, String> configs = new HashMap<>();
+        configs.put("data-type", "edge");
+        configs.put("label", "actIn");
+        testMissingRequiredOption(configs);
+    }
+
+    @Test
+    public void testEdgeMissingTargetNames() {
+        Map<String, String> configs = new HashMap<>();
+        configs.put("data-type", "edge");
+        configs.put("label", "actIn");
+        configs.put("source-name", "person,book");
+        testMissingRequiredOption(configs);
+    }
+
+    private void testMissingRequiredOption(Map<String, String> configs) {
+        Assert.assertThrows(IllegalArgumentException.class, () -> {
+            new HGOptions(configs);
+        });
+    }
+
+    @Test
+    public void testSelectedFields() {
+        Map<String, String> configs = new HashMap<>();
+        configs.put("data-type", "vertex");
+        configs.put("label", "person");
+        configs.put("id", "name");
+        configs.put("selected-fields", "id,name,age");
+
+        HGOptions hgOptions = new HGOptions(configs);
+        Set<String> selectedFieldSet = hgOptions.selectedFields();
+        String[] selectFields = selectedFieldSet.toArray(new String[0]);
+
+        Assert.assertArrayEquals(new String[]{"name", "id", "age"}, selectFields);
+    }
+
+    @Test
+    public void testIgnoredFields() {
+        Map<String, String> configs = new HashMap<>();
+        configs.put("data-type", "vertex");
+        configs.put("label", "person");
+        configs.put("id", "name");
+        configs.put("ignored-fields", "id,name,age");
+
+        HGOptions hgOptions = new HGOptions(configs);
+        Set<String> ignoreFieldSet = hgOptions.ignoredFields();
+        String[] ignoreFields = ignoreFieldSet.toArray(new String[0]);
+
+        Assert.assertArrayEquals(new String[]{"name", "id", "age"}, ignoreFields);
+    }
+
+    @Test
+    public void testSelectedAndIgnoredFields() {
+        Map<String, String> configs = new HashMap<>();
+        configs.put("data-type", "vertex");
+        configs.put("label", "person");
+        configs.put("id", "name");
+        configs.put("selected-fields", "id,name,age");
+        configs.put("ignored-fields", "price,ISBN");
+
+        Assert.assertThrows(IllegalArgumentException.class, () -> {
+            new HGOptions(configs);
+        });
+    }
+
+    @Test
+    public void testEdgeSourceAndTargetNames() {
+        Map<String, String> configs = new HashMap<>();
+        configs.put("data-type", "edge");
+        configs.put("label", "actIn");
+        configs.put("source-name", "person,book");
+        configs.put("target-name", "movie,competition");
+
+        HGOptions hgOptions = new HGOptions(configs);
+
+        String[] sourceNames = hgOptions.sourceName().toArray(new String[0]);
+        Assert.assertArrayEquals(new String[]{"person", "book"}, sourceNames);
+
+        String[] targetNames = hgOptions.targetName().toArray(new String[0]);
+        Assert.assertArrayEquals(new String[]{"movie", "competition"}, targetNames);
+    }
+}
diff --git a/hugegraph-spark-connector/src/test/java/org/apache/hugegraph/spark/connector/utils/DataTypeUtilsTest.java b/hugegraph-spark-connector/src/test/java/org/apache/hugegraph/spark/connector/utils/DataTypeUtilsTest.java
new file mode 100644
index 00000000..dbca930e
--- /dev/null
+++ b/hugegraph-spark-connector/src/test/java/org/apache/hugegraph/spark/connector/utils/DataTypeUtilsTest.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.spark.connector.utils;
+
+import java.util.Date;
+import java.util.UUID;
+
+import org.apache.hugegraph.structure.constant.Cardinality;
+import org.apache.hugegraph.structure.constant.DataType;
+import org.apache.hugegraph.structure.schema.PropertyKey;
+import org.apache.hugegraph.testutil.Assert;
+import org.junit.Test;
+
+public class DataTypeUtilsTest {
+
+    @Test
+    public void testParseNumber() {
+        Assert.assertEquals(12345,
+                            DataTypeUtils.parseNumber("valid-int-value", 12345L));
+        Assert.assertEquals(123456789987654L,
+                            DataTypeUtils.parseNumber("valid-long-value", 123456789987654L));
+        Assert.assertEquals(123456789987654L,
+                            DataTypeUtils.parseNumber("valid-long-value", "123456789987654"));
+        Assert.assertThrows(IllegalArgumentException.class, () -> {
+            DataTypeUtils.parseNumber("in-valid-long-value", "abc123456789");
+        });
+    }
+
+    @Test
+    public void testParseUUID() {
+        UUID uuid = UUID.randomUUID();
+        Assert.assertEquals(uuid, DataTypeUtils.parseUUID("uuid", uuid));
+        Assert.assertEquals(uuid, DataTypeUtils.parseUUID("string-uuid", uuid.toString()));
+        Assert.assertThrows(IllegalArgumentException.class, () -> {
+            DataTypeUtils.parseUUID("invalid-uuid", "hugegraph");
+        });
+    }
+
+    @Test
+    public void convertSingleString() {
+        PropertyKey propertyKey = new PropertyKey("name");
+        Assert.assertEquals("josh", DataTypeUtils.convert("josh", propertyKey));
+    }
+
+    @Test
+    public void convertSingleNumber() {
+        PropertyKey.Builder builder = new PropertyKey.BuilderImpl("number", null);
+        PropertyKey bytePropertyKey = builder.dataType(DataType.BYTE)
+                                   .cardinality(Cardinality.SINGLE)
+                                   .build();
+        Assert.assertEquals((byte) 25, DataTypeUtils.convert("25", bytePropertyKey));
+        Assert.assertEquals((byte) 25, DataTypeUtils.convert(25, bytePropertyKey));
+
+        PropertyKey intPropertyKey = builder.dataType(DataType.INT)
+                                             .cardinality(Cardinality.SINGLE)
+                                             .build();
+        Assert.assertEquals(20230920, DataTypeUtils.convert("20230920", intPropertyKey));
+        Assert.assertEquals(20230920, DataTypeUtils.convert(20230920, intPropertyKey));
+
+        PropertyKey longPropertyKey = builder.dataType(DataType.LONG)
+                                            .cardinality(Cardinality.SINGLE)
+                                            .build();
+        Assert.assertEquals(202309205435343242L, DataTypeUtils.convert("202309205435343242",
+                                                                       longPropertyKey));
+        Assert.assertEquals(202309205435343242L, DataTypeUtils.convert(202309205435343242L,
+                                                                       longPropertyKey));
+
+        PropertyKey floatPropertyKey = builder.dataType(DataType.FLOAT)
+                                             .cardinality(Cardinality.SINGLE)
+                                             .build();
+        Assert.assertEquals(123.45f, DataTypeUtils.convert("123.45", floatPropertyKey));
+        Assert.assertEquals(123.45f, DataTypeUtils.convert(123.45, floatPropertyKey));
+
+        PropertyKey doublePropertyKey = builder.dataType(DataType.DOUBLE)
+                                              .cardinality(Cardinality.SINGLE)
+                                              .build();
+        Assert.assertEquals(1235443.48956783, DataTypeUtils.convert("1235443.48956783",
+                                                                 doublePropertyKey));
+        Assert.assertEquals(1235443.48956783, DataTypeUtils.convert(1235443.48956783, doublePropertyKey));
+    }
+
+    @Test
+    public void testConvertSingleBoolean() {
+        PropertyKey.Builder builder = new PropertyKey.BuilderImpl("boolean", null);
+        PropertyKey booleanPropertyKey = builder.dataType(DataType.BOOLEAN)
+                                               .cardinality(Cardinality.SINGLE)
+                                               .build();
+
+        Assert.assertEquals(true, DataTypeUtils.convert(true, booleanPropertyKey));
+        Assert.assertEquals(true, DataTypeUtils.convert("true", booleanPropertyKey));
+        Assert.assertEquals(true, DataTypeUtils.convert("True", booleanPropertyKey));
+        Assert.assertEquals(true, DataTypeUtils.convert("TRUE", booleanPropertyKey));
+        Assert.assertEquals(true, DataTypeUtils.convert("yes", booleanPropertyKey));
+        Assert.assertEquals(true, DataTypeUtils.convert("y", booleanPropertyKey));
+        Assert.assertEquals(true, DataTypeUtils.convert("1", booleanPropertyKey));
+
+        Assert.assertEquals(false, DataTypeUtils.convert(false, booleanPropertyKey));
+        Assert.assertEquals(false, DataTypeUtils.convert("false", booleanPropertyKey));
+        Assert.assertEquals(false, DataTypeUtils.convert("False", booleanPropertyKey));
+        Assert.assertEquals(false, DataTypeUtils.convert("FALSE", booleanPropertyKey));
+        Assert.assertEquals(false, DataTypeUtils.convert("no", booleanPropertyKey));
+        Assert.assertEquals(false, DataTypeUtils.convert("n", booleanPropertyKey));
+        Assert.assertEquals(false, DataTypeUtils.convert("0", booleanPropertyKey));
+
+        Assert.assertThrows(IllegalArgumentException.class, () -> {
+            DataTypeUtils.convert("right", booleanPropertyKey);
+        });
+
+        Assert.assertThrows(IllegalArgumentException.class, () -> {
+            DataTypeUtils.convert(123, booleanPropertyKey);
+        });
+    }
+
+    @Test
+    public void testConvertSingleUUID() {
+        PropertyKey.Builder builder = new PropertyKey.BuilderImpl("uuid", null);
+        PropertyKey uuidPropertyKey = builder.dataType(DataType.UUID)
+                                                .cardinality(Cardinality.SINGLE)
+                                                .build();
+        UUID uuid = UUID.randomUUID();
+        Assert.assertEquals(uuid, DataTypeUtils.convert(uuid, uuidPropertyKey));
+        Assert.assertEquals(uuid, DataTypeUtils.convert(uuid.toString(), uuidPropertyKey));
+        Assert.assertThrows(IllegalArgumentException.class, () -> {
+            DataTypeUtils.convert("hugegraph", uuidPropertyKey);
+        });
+    }
+
+    @Test
+    public void testConvertSingleDate() {
+        PropertyKey.Builder builder = new PropertyKey.BuilderImpl("date", null);
+        PropertyKey datePropertyKey = builder.dataType(DataType.DATE)
+                                             .cardinality(Cardinality.SINGLE)
+                                             .build();
+
+        Date date = new Date(1695233374320L);
+        Assert.assertEquals(date, DataTypeUtils.convert(date, datePropertyKey));
+        Assert.assertEquals(date, DataTypeUtils.convert(1695233374320L, datePropertyKey));
+        // TODO check
+        // Assert.assertEquals(date, DataTypeUtils.convert("2023-09-21 02:09:34", datePropertyKey));
+        Assert.assertThrows(IllegalArgumentException.class, () -> {
+            DataTypeUtils.convert(date.toString(), datePropertyKey);
+        });
+        Assert.assertThrows(IllegalArgumentException.class, () -> {
+            DataTypeUtils.convert("abc", datePropertyKey);
+        });
+    }
+}
diff --git a/hugegraph-spark-connector/src/test/java/org/apache/hugegraph/spark/connector/utils/HGEnvUtils.java b/hugegraph-spark-connector/src/test/java/org/apache/hugegraph/spark/connector/utils/HGEnvUtils.java
new file mode 100644
index 00000000..28f112d4
--- /dev/null
+++ b/hugegraph-spark-connector/src/test/java/org/apache/hugegraph/spark/connector/utils/HGEnvUtils.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.spark.connector.utils;
+
+import org.apache.hugegraph.driver.HugeClient;
+import org.apache.hugegraph.driver.SchemaManager;
+
+public class HGEnvUtils {
+
+    public static final String DEFAULT_HOST = "127.0.0.1";
+    public static final String DEFAULT_PORT = "8080";
+    public static final String DEFAULT_GRAPH = "hugegraph";
+    public static final String DEFAULT_URL = "http://" + DEFAULT_HOST + ":" + DEFAULT_PORT;
+
+    private static HugeClient hugeClient;
+
+    public static void createEnv() {
+
+        hugeClient = HugeClient.builder(DEFAULT_URL, DEFAULT_GRAPH).build();
+
+        hugeClient.graphs().clearGraph(DEFAULT_GRAPH, "I'm sure to delete all data");
+
+        SchemaManager schema = hugeClient.schema();
+
+        // Define schema
+        schema.propertyKey("name").asText().ifNotExist().create();
+        schema.propertyKey("age").asInt().ifNotExist().create();
+        schema.propertyKey("city").asText().ifNotExist().create();
+        schema.propertyKey("weight").asDouble().ifNotExist().create();
+        schema.propertyKey("lang").asText().ifNotExist().create();
+        schema.propertyKey("date").asText().ifNotExist().create();
+        schema.propertyKey("price").asDouble().ifNotExist().create();
+
+        schema.vertexLabel("person")
+              .properties("name", "age", "city")
+              .useCustomizeStringId()
+              .nullableKeys("age", "city")
+              .ifNotExist()
+              .create();
+
+        schema.vertexLabel("software")
+              .properties("name", "lang", "price")
+              .primaryKeys("name")
+              .ifNotExist()
+              .create();
+
+        schema.edgeLabel("knows")
+              .sourceLabel("person")
+              .targetLabel("person")
+              .properties("date", "weight")
+              .ifNotExist()
+              .create();
+
+        schema.edgeLabel("created")
+              .sourceLabel("person")
+              .targetLabel("software")
+              .properties("date", "weight")
+              .ifNotExist()
+              .create();
+    }
+
+    public static void destroyEnv() {
+        hugeClient.close();
+    }
+
+    public static HugeClient getHugeClient() {
+        return hugeClient;
+    }
+}
diff --git a/hugegraph-spark-connector/src/test/java/org/apache/hugegraph/spark/connector/utils/HGUtilsTest.java b/hugegraph-spark-connector/src/test/java/org/apache/hugegraph/spark/connector/utils/HGUtilsTest.java
new file mode 100644
index 00000000..b25eb15f
--- /dev/null
+++ b/hugegraph-spark-connector/src/test/java/org/apache/hugegraph/spark/connector/utils/HGUtilsTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.spark.connector.utils;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hugegraph.spark.connector.mapping.EdgeMapping;
+import org.apache.hugegraph.spark.connector.mapping.VertexMapping;
+import org.apache.hugegraph.spark.connector.options.HGOptions;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class HGUtilsTest {
+
+    @Test
+    public void testVertexMappingFromConf() {
+        Map<String, String> configs = new HashMap<>();
+        configs.put("data-type", "vertex");
+        configs.put("label", "person");
+        configs.put("id", "name");
+        configs.put("batch-size", "100");
+        configs.put("ignored-fields", "price,ISBN");
+
+        HGOptions options = new HGOptions(configs);
+        VertexMapping vertexMapping = HGUtils.vertexMappingFromConf(options);
+
+        Assert.assertEquals("vertex", vertexMapping.type().dataType());
+        Assert.assertEquals("person", vertexMapping.label());
+        Assert.assertEquals("name", vertexMapping.idField());
+        Assert.assertEquals(100, vertexMapping.batchSize());
+        Assert.assertArrayEquals(new String[]{"ISBN", "price"},
+                                 vertexMapping.ignoredFields().toArray());
+    }
+
+    @Test
+    public void testEdgeMappingFromConf() {
+        Map<String, String> configs = new HashMap<>();
+        configs.put("data-type", "edge");
+        configs.put("label", "buy");
+        configs.put("source-name", "id,name,age");
+        configs.put("target-name", "price,ISBN");
+        configs.put("batch-size", "200");
+        configs.put("selected-fields", "price,pages");
+
+        HGOptions options = new HGOptions(configs);
+        EdgeMapping edgeMapping = HGUtils.edgeMappingFromConf(options);
+
+        Assert.assertEquals("edge", edgeMapping.type().dataType());
+        Assert.assertEquals("buy", edgeMapping.label());
+        Assert.assertEquals(200, edgeMapping.batchSize());
+
+        Assert.assertArrayEquals(new String[]{"id", "name", "age"},
+                                 edgeMapping.sourceFields().toArray(new String[0]));
+        Assert.assertArrayEquals(new String[]{"price", "ISBN"},
+                                 edgeMapping.targetFields().toArray(new String[0]));
+        Assert.assertArrayEquals(new String[]{"pages", "price"},
+                                 edgeMapping.selectedFields().toArray(new String[0]));
+    }
+}
diff --git a/hugegraph-spark-connector/src/test/scala/org/apache/hugegraph/spark/connector/SinkExampleTest.scala b/hugegraph-spark-connector/src/test/scala/org/apache/hugegraph/spark/connector/SinkExampleTest.scala
new file mode 100644
index 00000000..2219c3b1
--- /dev/null
+++ b/hugegraph-spark-connector/src/test/scala/org/apache/hugegraph/spark/connector/SinkExampleTest.scala
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.spark.connector
+
+import org.apache.hugegraph.driver.HugeClient
+import org.apache.hugegraph.spark.connector.utils.HGEnvUtils
+import org.apache.hugegraph.structure.graph.Vertex
+import org.apache.spark.sql.{SaveMode, SparkSession}
+import org.junit.Assert.{assertEquals, assertTrue}
+import org.junit.runners.MethodSorters
+import org.junit.{AfterClass, BeforeClass, FixMethodOrder, Test}
+
+import java.util
+import scala.collection.JavaConverters.iterableAsScalaIterableConverter
+
+object SinkExampleTest {
+
+  var client: HugeClient = _
+
+  val sparkSession: SparkSession = SparkSession.builder()
+    .master("local[*]")
+    .appName(this.getClass.getSimpleName)
+    .getOrCreate()
+
+  @BeforeClass
+  def beforeClass(): Unit = {
+    HGEnvUtils.createEnv()
+    client = HGEnvUtils.getHugeClient
+  }
+
+  @AfterClass
+  def afterClass(): Unit = {
+    HGEnvUtils.destroyEnv()
+    sparkSession.stop()
+  }
+}
+
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
+class SinkExampleTest {
+
+  val client: HugeClient = SinkExampleTest.client
+  val sparkSession: SparkSession = SinkExampleTest.sparkSession
+
+  val DEFAULT_ENTRANCE: String = "org.apache.hugegraph.spark.connector.DataSource"
+  val DEFAULT_HOST: String = HGEnvUtils.DEFAULT_HOST
+  val DEFAULT_PORT: String = HGEnvUtils.DEFAULT_PORT
+  val DEFAULT_GRAPH: String = HGEnvUtils.DEFAULT_GRAPH
+
+  @Test
+  def testFirstInsertVertexPerson(): Unit = {
+    val df = sparkSession.createDataFrame(Seq(
+      Tuple3("marko", 29, "Beijing"),
+      Tuple3("vadas", 27, "HongKong"),
+      Tuple3("Josh", 32, "Beijing"),
+      Tuple3("peter", 35, "ShangHai"),
+      Tuple3("li,nary", 26, "Wu,han"),
+      Tuple3("Bob", 18, "HangZhou"),
+    )).toDF("name", "age", "city")
+
+    df.show()
+
+    df.write
+      .format(DEFAULT_ENTRANCE)
+      .option("host", DEFAULT_HOST)
+      .option("port", DEFAULT_PORT)
+      .option("graph", DEFAULT_GRAPH)
+      .option("data-type", "vertex")
+      .option("label", "person")
+      .option("id", "name")
+      .option("batch-size", 2)
+      .mode(SaveMode.Overwrite)
+      .save()
+
+    val vertices: util.List[Vertex] = client.graph().listVertices("person")
+    assertEquals(6, vertices.size())
+  }
+
+  @Test
+  def testFirstInsertVertexSoftware(): Unit = {
+    val df = sparkSession.createDataFrame(Seq(
+      Tuple4("lop", "java", 328L, "ISBN978-7-107-18618-5"),
+      Tuple4("ripple", "python", 199L, "ISBN978-7-100-13678-5"),
+    )).toDF("name", "lang", "price", "ISBN")
+
+    df.show()
+
+    df.write
+      .format(DEFAULT_ENTRANCE)
+      .option("host", DEFAULT_HOST)
+      .option("port", DEFAULT_PORT)
+      .option("graph", DEFAULT_GRAPH)
+      .option("data-type", "vertex")
+      .option("label", "software")
+      .option("ignored-fields", "ISBN")
+      .option("batch-size", 2)
+      .mode(SaveMode.Overwrite)
+      .save()
+
+    val vertices = client.graph().listVertices("software").asScala.toList
+    assertEquals(2, vertices.size)
+
+    for (vertex <- vertices) {
+      val properties = vertex.properties()
+      assertTrue(!properties.containsKey("ISBN"))
+    }
+  }
+
+  @Test
+  def testSecondInsertEdgeKnows(): Unit = {
+    val df = sparkSession.createDataFrame(Seq(
+      Tuple4("marko", "vadas", "20160110", 0.5),
+      Tuple4("peter", "Josh", "20230801", 1.0),
+      Tuple4("peter", "li,nary", "20130220", 2.0)
+    )).toDF("source", "target", "date", "weight")
+
+    df.show()
+
+    df.write
+      .format(DEFAULT_ENTRANCE)
+      .option("host", DEFAULT_HOST)
+      .option("port", DEFAULT_PORT)
+      .option("graph", DEFAULT_GRAPH)
+      .option("data-type", "edge")
+      .option("label", "knows")
+      .option("source-name", "source")
+      .option("target-name", "target")
+      .option("batch-size", 2)
+      .mode(SaveMode.Overwrite)
+      .save()
+
+    val edges = client.graph().listEdges("knows")
+    assertEquals(3, edges.size())
+  }
+
+  @Test
+  def testSecondInsertEdgeCreated(): Unit = {
+    val df = sparkSession.createDataFrame(Seq(
+      Tuple4("marko", "lop", "20171210", 0.5),
+      Tuple4("Josh", "lop", "20091111", 0.4),
+      Tuple4("peter", "ripple", "20171210", 1.0),
+      Tuple4("vadas", "lop", "20171210", 0.2)
+    )).toDF("source", "name", "date", "weight")
+
+    df.show()
+
+    df.write
+      .format(DEFAULT_ENTRANCE)
+      .option("host", DEFAULT_HOST)
+      .option("port", DEFAULT_PORT)
+      .option("graph", DEFAULT_GRAPH)
+      .option("data-type", "edge")
+      .option("label", "created")
+      .option("source-name", "source") // customize
+      .option("target-name", "name") // pk
+      .option("batch-size", 2)
+      .mode(SaveMode.Overwrite)
+      .save()
+
+    val edges = client.graph().listEdges("created")
+    assertEquals(4, edges.size())
+  }
+}
diff --git a/pom.xml b/pom.xml
index b11b9e7f..7eb4cb50 100644
--- a/pom.xml
+++ b/pom.xml
@@ -90,6 +90,7 @@
     <modules>
         <module>hugegraph-client</module>
         <module>hugegraph-loader</module>
+        <module>hugegraph-spark-connector</module>
         <module>hugegraph-tools</module>
         <module>hugegraph-hubble</module>
         <module>hugegraph-dist</module>