You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hugegraph.apache.org by ji...@apache.org on 2022/11/10 08:46:27 UTC

[incubator-hugegraph-toolchain] branch master updated: feat(hbase): support gen HFile for hbase v2 (BETA) (#358)

This is an automated email from the ASF dual-hosted git repository.

jin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hugegraph-toolchain.git


The following commit(s) were added to refs/heads/master by this push:
     new a622f986 feat(hbase):  support gen HFile for hbase v2 (BETA) (#358)
a622f986 is described below

commit a622f986b22b1c920d047ab96296c509a77caeb9
Author: alan.zhao <30...@users.noreply.github.com>
AuthorDate: Thu Nov 10 16:46:22 2022 +0800

    feat(hbase):  support gen HFile for hbase v2 (BETA) (#358)
    
    * Fix bug: SchemaCache init and after query schema then put the latest schema to SchemaCache
    
    * Fix bug: flink-loader SchemaCache init
    
    * 指定hbase版本2.2.3
    
    *fix third-party dependencies error add extra jars to known-dependencies.txt
    
    * MappingConverterTest add backendstoreinfo,fix hugegraph-loader-ci check faield
    
    Co-authored-by: alanzhao <al...@126.com>
    Co-authored-by: imbajin <ji...@apache.org>
---
 .../scripts/dependency/known-dependencies.txt      |  27 +++
 hugegraph-loader/assembly/static/bin/get-params.sh |   2 +-
 .../static/example/spark/edge_created.json         |   4 +
 .../assembly/static/example/spark/edge_knows.json  |   2 +
 .../assembly/static/example/spark/schema.groovy    |  33 +++
 .../assembly/static/example/spark/struct.json      |  57 +++++
 .../static/example/spark/vertex_person.json        |   6 +
 .../static/example/spark/vertex_software.json      |   2 +
 hugegraph-loader/pom.xml                           | 127 ++++++++++
 .../hugegraph/loader/builder/EdgeBuilder.java      |  48 ++++
 .../hugegraph/loader/builder/ElementBuilder.java   |   3 +
 .../hugegraph/loader/builder/VertexBuilder.java    |  15 ++
 .../baidu/hugegraph/loader/constant/Constants.java |   4 +
 .../loader/direct/loader/DirectLoader.java         |  76 ++++++
 .../loader/direct/loader/HBaseDirectLoader.java    | 268 +++++++++++++++++++++
 .../hugegraph/loader/direct/util/SinkToHBase.java  | 165 +++++++++++++
 .../hugegraph/loader/executor/LoadOptions.java     |  38 +++
 .../hugegraph/loader/mapping/BackendStoreInfo.java |  83 +++++++
 .../hugegraph/loader/mapping/LoadMapping.java      |  14 ++
 .../loader/metrics/LoadDistributeMetrics.java      | 154 ++++++++++++
 .../loader/spark/HugeGraphSparkLoader.java         |  88 ++++++-
 .../hugegraph/loader/struct/GraphStructV1.java     |   8 +
 .../baidu/hugegraph/loader/util/MappingUtil.java   |   2 +-
 .../loader/test/unit/MappingConverterTest.java     |  18 +-
 pom.xml                                            |   1 +
 25 files changed, 1231 insertions(+), 14 deletions(-)

diff --git a/hugegraph-dist/scripts/dependency/known-dependencies.txt b/hugegraph-dist/scripts/dependency/known-dependencies.txt
index c71c7e01..57ba9bf1 100644
--- a/hugegraph-dist/scripts/dependency/known-dependencies.txt
+++ b/hugegraph-dist/scripts/dependency/known-dependencies.txt
@@ -179,6 +179,24 @@ hadoop-yarn-client-3.1.1.jar
 hadoop-yarn-common-3.1.1.jar
 hamcrest-2.1.jar
 hamcrest-core-1.3.jar
+hbase-client-2.2.3.jar
+hbase-common-2.2.3.jar
+hbase-hadoop-compat-2.2.3.jar
+hbase-hadoop2-compat-2.2.3.jar
+hbase-http-2.2.3.jar
+hbase-mapreduce-2.2.3.jar
+hbase-metrics-2.2.3.jar
+hbase-metrics-api-2.2.3.jar
+hbase-procedure-2.2.3.jar
+hbase-protocol-2.2.3.jar
+hbase-protocol-shaded-2.2.3.jar
+hbase-replication-2.2.3.jar
+hbase-server-2.2.3.jar
+hbase-shaded-client-byo-hadoop-2.2.3.jar
+hbase-shaded-miscellaneous-2.2.1.jar
+hbase-shaded-netty-2.2.1.jar
+hbase-shaded-protobuf-2.2.1.jar
+hbase-zookeeper-2.2.3.jar
 hibernate-validator-6.0.17.Final.jar
 hive-classification-3.1.2.jar
 hive-classification-3.1.3.jar
@@ -225,6 +243,7 @@ hk2-utils-3.0.1.jar
 hppc-0.7.2.jar
 htrace-core4-4.0.1-incubating.jar
 htrace-core4-4.1.0-incubating.jar
+htrace-core4-4.2.0-incubating.jar
 httpclient-4.5.13.jar
 httpclient-4.5.2.jar
 httpclient-4.5.9.jar
@@ -578,3 +597,11 @@ zookeeper-3.4.9.jar
 zookeeper-3.6.2.jar
 zookeeper-jute-3.6.2.jar
 zstd-jni-1.5.0-4.jar
+disruptor-3.3.6.jar
+findbugs-annotations-1.3.9-1.jar
+jamon-runtime-2.4.1.jar
+javax.el-3.0.1-b12.jar
+javax.servlet.jsp-2.3.2.jar
+javax.servlet.jsp-api-2.3.1.jar
+jcodings-1.0.18.jar
+joni-2.1.11.jar
diff --git a/hugegraph-loader/assembly/static/bin/get-params.sh b/hugegraph-loader/assembly/static/bin/get-params.sh
index c73ebbbd..e0302399 100644
--- a/hugegraph-loader/assembly/static/bin/get-params.sh
+++ b/hugegraph-loader/assembly/static/bin/get-params.sh
@@ -27,7 +27,7 @@ function get_params() {
       --incremental-mode | --failure-mode | --batch-insert-threads | --single-insert-threads | \
       --max-conn | --max-conn-per-route | --batch-size | --max-parse-errors | --max-insert-errors | \
       --timeout | --shutdown-timeout | --retry-times | --retry-interval | --check-vertex | \
-      --print-progress | --dry-run | --help)
+      --print-progress | --dry-run | --sink-type | --vertex-partitions | --edge-partitions | --help )
         HUGEGRAPH_PARAMS="$HUGEGRAPH_PARAMS $1 $2"
         shift 2
         ;;
diff --git a/hugegraph-loader/assembly/static/example/spark/edge_created.json b/hugegraph-loader/assembly/static/example/spark/edge_created.json
new file mode 100644
index 00000000..ba093eab
--- /dev/null
+++ b/hugegraph-loader/assembly/static/example/spark/edge_created.json
@@ -0,0 +1,4 @@
+{"source_name": "marko", "target_id": 1, "date": "2017-12-10", "weight": 0.4}
+{"source_name": "josh", "target_id": 1, "date": "2009-11-11", "weight": 0.4}
+{"source_name": "josh", "target_id": 2, "date": "2017-12-10", "weight": 1.0}
+{"source_name": "peter", "target_id": 1, "date": "2017-03-24", "weight": 0.2}
diff --git a/hugegraph-loader/assembly/static/example/spark/edge_knows.json b/hugegraph-loader/assembly/static/example/spark/edge_knows.json
new file mode 100644
index 00000000..0c2af9ad
--- /dev/null
+++ b/hugegraph-loader/assembly/static/example/spark/edge_knows.json
@@ -0,0 +1,2 @@
+{"source_name": "marko", "target_name": "vadas", "date": "20160110", "weight": 0.5}
+{"source_name": "marko", "target_name": "josh", "date": "20130220", "weight": 1.0}
diff --git a/hugegraph-loader/assembly/static/example/spark/schema.groovy b/hugegraph-loader/assembly/static/example/spark/schema.groovy
new file mode 100644
index 00000000..15e829d3
--- /dev/null
+++ b/hugegraph-loader/assembly/static/example/spark/schema.groovy
@@ -0,0 +1,33 @@
+// Define schema
+schema.propertyKey("name").asText().ifNotExist().create();
+schema.propertyKey("age").asInt().ifNotExist().create();
+schema.propertyKey("city").asText().ifNotExist().create();
+schema.propertyKey("weight").asDouble().ifNotExist().create();
+schema.propertyKey("lang").asText().ifNotExist().create();
+schema.propertyKey("date").asText().ifNotExist().create();
+schema.propertyKey("price").asDouble().ifNotExist().create();
+
+schema.vertexLabel("person")
+      .properties("name", "age", "city")
+      .primaryKeys("name")
+      .nullableKeys("age", "city")
+      .ifNotExist()
+      .create();
+schema.vertexLabel("software")
+        .properties("name", "lang", "price")
+        .primaryKeys("name")
+      .ifNotExist()
+      .create();
+
+schema.edgeLabel("knows")
+      .sourceLabel("person")
+      .targetLabel("person")
+      .properties("date", "weight")
+      .ifNotExist()
+      .create();
+schema.edgeLabel("created")
+      .sourceLabel("person")
+      .targetLabel("software")
+      .properties("date", "weight")
+      .ifNotExist()
+      .create();
diff --git a/hugegraph-loader/assembly/static/example/spark/struct.json b/hugegraph-loader/assembly/static/example/spark/struct.json
new file mode 100644
index 00000000..275bdbd3
--- /dev/null
+++ b/hugegraph-loader/assembly/static/example/spark/struct.json
@@ -0,0 +1,57 @@
+{
+  "vertices": [
+    {
+      "label": "person",
+      "input": {
+        "type": "file",
+        "path": "example/spark/vertex_person.json",
+        "format": "JSON",
+        "header": ["name", "age", "city"],
+        "charset": "UTF-8",
+        "skipped_line": {
+          "regex": "(^#|^//).*"
+        }
+      },
+      "id": "name",
+      "null_values": ["NULL", "null", ""]
+    },
+    {
+      "label": "software",
+      "input": {
+        "type": "file",
+        "path": "example/spark/vertex_software.json",
+        "format": "JSON",
+        "header": ["id","name", "lang", "price","ISBN"],
+        "charset": "GBK"
+      },
+      "id": "name",
+      "ignored": ["ISBN"]
+    }
+  ],
+  "edges": [
+    {
+      "label": "knows",
+      "source": ["source_name"],
+      "target": ["target_name"],
+      "input": {
+        "type": "file",
+        "path": "example/spark/edge_knows.json",
+        "format": "JSON",
+        "date_format": "yyyyMMdd",
+        "header": ["source_name","target_name", "date", "weight"]
+      },
+      "field_mapping": {
+        "source_name": "name",
+        "target_name": "name"
+      }
+    }
+  ],
+  "backendStoreInfo":
+  {
+    "edge_tablename": "hugegraph:g_oe",
+    "vertex_tablename": "hugegraph:g_v",
+    "hbase_zookeeper_quorum": "127.0.0.1",
+    "hbase_zookeeper_property_clientPort": "2181",
+    "zookeeper_znode_parent": "/hbase"
+  }
+}
diff --git a/hugegraph-loader/assembly/static/example/spark/vertex_person.json b/hugegraph-loader/assembly/static/example/spark/vertex_person.json
new file mode 100644
index 00000000..e018df41
--- /dev/null
+++ b/hugegraph-loader/assembly/static/example/spark/vertex_person.json
@@ -0,0 +1,6 @@
+{"name": "marko", "age": "29",  "city": "Beijing"}
+{"name": "vadas", "age": "27",  "city": "Hongkong"}
+{"name": "josh", "age": "32",  "city": "Beijing"}
+{"name": "peter", "age": "35",  "city": "Shanghai"}
+{"name": "li,nary", "age": "26",  "city": "Wu,han"}
+{"name": "tom", "age": "null",  "city": "NULL"}
diff --git a/hugegraph-loader/assembly/static/example/spark/vertex_software.json b/hugegraph-loader/assembly/static/example/spark/vertex_software.json
new file mode 100644
index 00000000..cd9dbf3c
--- /dev/null
+++ b/hugegraph-loader/assembly/static/example/spark/vertex_software.json
@@ -0,0 +1,2 @@
+{ "name": "lop",  "lang": "java","price": "328","ISBN": "ISBN978-7-107-18618-5"}
+{ "name": "ripple",  "lang": "java","price": "199","ISBN": "ISBN978-7-100-13678-5"}
diff --git a/hugegraph-loader/pom.xml b/hugegraph-loader/pom.xml
index b959395f..2b2ace69 100644
--- a/hugegraph-loader/pom.xml
+++ b/hugegraph-loader/pom.xml
@@ -173,6 +173,117 @@
             <version>30.0-jre</version>
         </dependency>
 
+        <!--hbase -->
+        <dependency>
+            <groupId>org.apache.hbase</groupId>
+            <artifactId>hbase-common</artifactId>
+            <version>${hbase.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>com.github.stephenc.findbugs</groupId>
+                    <artifactId>findbugs-annotations</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.sun.jersey</groupId>
+                    <artifactId>jersey-server</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.sun.jersey</groupId>
+                    <artifactId>jersey-core</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.sun.jersey</groupId>
+                    <artifactId>jersey-json</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hbase</groupId>
+            <artifactId>hbase-client</artifactId>
+            <version>${hbase.version}</version>
+            <exclusions>
+                <exclusion>
+                    <artifactId>hadoop-auth</artifactId>
+                    <groupId>org.apache.hadoop</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hbase</groupId>
+            <artifactId>hbase-shaded-client-byo-hadoop</artifactId>
+            <!--<artifactId>hbase-client</artifactId>-->
+            <version>${hbase.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-api</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-common</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-ipc</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hbase</groupId>
+            <artifactId>hbase-mapreduce</artifactId>
+            <version>${hbase.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-api</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-common</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-mapreduce-client-core</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-hdfs</artifactId>
+                </exclusion>
+                <exclusion>
+                    <artifactId>hadoop-auth</artifactId>
+                    <groupId>org.apache.hadoop</groupId>
+                </exclusion>
+                <exclusion>
+                    <artifactId>javax.ws.rs-api</artifactId>
+                    <groupId>javax.ws.rs</groupId>
+                </exclusion>
+                <exclusion>
+                    <artifactId>jersey-client</artifactId>
+                    <groupId>org.glassfish.jersey.core</groupId>
+                </exclusion>
+                <exclusion>
+                    <artifactId>jersey-common</artifactId>
+                    <groupId>org.glassfish.jersey.core</groupId>
+                </exclusion>
+                <exclusion>
+                    <artifactId>jersey-server</artifactId>
+                    <groupId>org.glassfish.jersey.core</groupId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-distcp</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
         <!-- hadoop dependency -->
         <dependency>
             <groupId>org.apache.hadoop</groupId>
@@ -342,10 +453,18 @@
                     <groupId>org.apache.parquet</groupId>
                     <artifactId>parquet-hadoop-bundle</artifactId>
                 </exclusion>
+                <exclusion>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-common</artifactId>
+                </exclusion>
                 <exclusion>
                     <groupId>org.apache.parquet</groupId>
                     <artifactId>parquet-column</artifactId>
                 </exclusion>
+                <exclusion>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-hdfs</artifactId>
+                </exclusion>
                 <exclusion>
                     <groupId>org.apache.thrift</groupId>
                     <artifactId>libfb303</artifactId>
@@ -354,6 +473,14 @@
                     <groupId>com.fasterxml.jackson.core</groupId>
                     <artifactId>*</artifactId>
                 </exclusion>
+                <exclusion>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-distcp</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-mapreduce-client-core</artifactId>
+                </exclusion>
             </exclusions>
         </dependency>
     </dependencies>
diff --git a/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/builder/EdgeBuilder.java b/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/builder/EdgeBuilder.java
index e3d411ef..e384e208 100644
--- a/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/builder/EdgeBuilder.java
+++ b/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/builder/EdgeBuilder.java
@@ -37,6 +37,7 @@ import com.baidu.hugegraph.structure.schema.SchemaLabel;
 import com.baidu.hugegraph.structure.schema.VertexLabel;
 import org.apache.hugegraph.util.E;
 import com.google.common.collect.ImmutableList;
+import org.apache.spark.sql.Row;
 
 public class EdgeBuilder extends ElementBuilder<Edge> {
 
@@ -109,6 +110,53 @@ public class EdgeBuilder extends ElementBuilder<Edge> {
         }
         return edges;
     }
+    
+    @Override
+    public List<Edge> build(Row row) {
+        String[] names = row.schema().fieldNames();
+        Object[] values = new Object[row.size()];
+        for (int i = 0; i < row.size(); i++) {
+            values[i] = row.get(i);
+        }
+        if (this.vertexIdsIndex == null ||
+            !Arrays.equals(this.lastNames, names)) {
+            this.vertexIdsIndex = this.extractVertexIdsIndex(names);
+        }
+
+        this.lastNames = names;
+        EdgeKVPairs kvPairs = this.newEdgeKVPairs();
+        kvPairs.source.extractFromEdge(names, values,
+                this.vertexIdsIndex.sourceIndexes);
+        kvPairs.target.extractFromEdge(names, values,
+                this.vertexIdsIndex.targetIndexes);
+        kvPairs.extractProperties(names, values);
+
+        List<Vertex> sources = kvPairs.source.buildVertices(false);
+        List<Vertex> targets = kvPairs.target.buildVertices(false);
+        if (sources.isEmpty() || targets.isEmpty()) {
+            return ImmutableList.of();
+        }
+        E.checkArgument(sources.size() == 1 || targets.size() == 1 ||
+                        sources.size() == targets.size(),
+                        "The elements number of source and target must be: " +
+                        "1 to n, n to 1, n to n");
+        int size = Math.max(sources.size(), targets.size());
+        List<Edge> edges = new ArrayList<>(size);
+        for (int i = 0; i < size; i++) {
+            Vertex source = i < sources.size() ?
+                            sources.get(i) : sources.get(0);
+            Vertex target = i < targets.size() ?
+                            targets.get(i) : targets.get(0);
+            Edge edge = new Edge(this.mapping.label());
+            edge.source(source);
+            edge.target(target);
+            // Add properties
+            this.addProperties(edge, kvPairs.properties);
+            this.checkNonNullableKeys(edge);
+            edges.add(edge);
+        }
+        return edges;
+    }
 
     private EdgeKVPairs newEdgeKVPairs() {
         EdgeKVPairs kvPairs = new EdgeKVPairs();
diff --git a/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/builder/ElementBuilder.java b/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/builder/ElementBuilder.java
index aff8e7be..72002336 100644
--- a/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/builder/ElementBuilder.java
+++ b/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/builder/ElementBuilder.java
@@ -54,6 +54,7 @@ import com.baidu.hugegraph.structure.schema.VertexLabel;
 import org.apache.hugegraph.util.E;
 import org.apache.hugegraph.util.LongEncoding;
 import com.google.common.collect.ImmutableList;
+import org.apache.spark.sql.Row;
 
 public abstract class ElementBuilder<GE extends GraphElement> {
 
@@ -75,6 +76,8 @@ public abstract class ElementBuilder<GE extends GraphElement> {
 
     public abstract List<GE> build(String[] names, Object[] values);
 
+    public abstract List<GE> build(Row row);
+
     public abstract SchemaLabel schemaLabel();
 
     protected abstract Collection<String> nonNullableKeys();
diff --git a/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/builder/VertexBuilder.java b/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/builder/VertexBuilder.java
index 2366bc2c..633bdea5 100644
--- a/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/builder/VertexBuilder.java
+++ b/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/builder/VertexBuilder.java
@@ -28,6 +28,8 @@ import com.baidu.hugegraph.loader.mapping.VertexMapping;
 import com.baidu.hugegraph.structure.graph.Vertex;
 import com.baidu.hugegraph.structure.schema.SchemaLabel;
 import com.baidu.hugegraph.structure.schema.VertexLabel;
+
+import org.apache.spark.sql.Row;
 import org.apache.hugegraph.util.E;
 
 public class VertexBuilder extends ElementBuilder<Vertex> {
@@ -59,6 +61,19 @@ public class VertexBuilder extends ElementBuilder<Vertex> {
         return kvPairs.buildVertices(true);
     }
 
+    @Override
+    public List<Vertex> build(Row row) {
+        VertexKVPairs kvPairs = this.newKVPairs(this.vertexLabel,
+                                                this.mapping.unfold());
+        String[] names = row.schema().fieldNames();
+        Object[] values = new Object[row.size()];
+        for (int i = 0; i < row.size(); i++) {
+            values[i] = row.get(i);
+        }
+        kvPairs.extractFromVertex(names, values);
+        return kvPairs.buildVertices(true);
+    }
+
     @Override
     public SchemaLabel schemaLabel() {
         return this.vertexLabel;
diff --git a/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/constant/Constants.java b/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/constant/Constants.java
index 6f577f00..fb42312a 100644
--- a/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/constant/Constants.java
+++ b/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/constant/Constants.java
@@ -79,4 +79,8 @@ public final class Constants {
 
     public static final String CDC_DATA = "data";
     public static final String CDC_OP = "op";
+    public static final String HBASE_COL_FAMILY = "f";
+    public static final String LOAD_DATA_PARSE_SUFFIX = "parse";
+    public static final String LOAD_DATA_SER_SUFFIX = "ser";
+    public static final String LOAD_DATA_INSERT_SUFFIX = "insert";
 }
diff --git a/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/direct/loader/DirectLoader.java b/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/direct/loader/DirectLoader.java
new file mode 100644
index 00000000..c197f90b
--- /dev/null
+++ b/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/direct/loader/DirectLoader.java
@@ -0,0 +1,76 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.loader.direct.loader;
+
+import com.baidu.hugegraph.loader.builder.EdgeBuilder;
+import com.baidu.hugegraph.loader.builder.ElementBuilder;
+import com.baidu.hugegraph.loader.builder.VertexBuilder;
+import com.baidu.hugegraph.loader.executor.LoadContext;
+import com.baidu.hugegraph.loader.executor.LoadOptions;
+import com.baidu.hugegraph.loader.mapping.EdgeMapping;
+import com.baidu.hugegraph.loader.mapping.InputStruct;
+import com.baidu.hugegraph.loader.mapping.VertexMapping;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+
+import java.io.Serializable;
+import java.util.LinkedList;
+import java.util.List;
+
+public abstract class DirectLoader<T,R> implements Serializable {
+
+    LoadOptions loadOptions;
+    InputStruct struct;
+
+    public DirectLoader(LoadOptions loadOptions,
+                        InputStruct struct) {
+        this.loadOptions = loadOptions;
+        this.struct = struct;
+    }
+
+    public final void bulkload(Dataset<Row> ds) {
+        JavaPairRDD<T, R> javaPairRDD = buildVertexAndEdge(ds);
+        String path = generateFiles(javaPairRDD);
+        loadFiles(path);
+    }
+
+    protected List<ElementBuilder> getElementBuilders() {
+        LoadContext context = new LoadContext(loadOptions);
+        context.schemaCache().updateAll();
+        List<ElementBuilder> buildersForGraphElement = new LinkedList<>();
+        for (VertexMapping vertexMapping : struct.vertices()) {
+            buildersForGraphElement.add(
+                    new VertexBuilder(context, struct, vertexMapping)
+            );
+        }
+        for (EdgeMapping edgeMapping : struct.edges()) {
+            buildersForGraphElement.add(new EdgeBuilder(context, struct, edgeMapping));
+        }
+        context.close();
+        return buildersForGraphElement;
+    }
+
+    abstract JavaPairRDD<T, R> buildVertexAndEdge(Dataset<Row> ds);
+
+    abstract String generateFiles(JavaPairRDD<T,  R> buildAndSerRdd);
+
+    abstract void loadFiles(String path);
+}
diff --git a/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/direct/loader/HBaseDirectLoader.java b/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/direct/loader/HBaseDirectLoader.java
new file mode 100644
index 00000000..b2c10af9
--- /dev/null
+++ b/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/direct/loader/HBaseDirectLoader.java
@@ -0,0 +1,268 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.loader.direct.loader;
+
+import com.baidu.hugegraph.loader.builder.ElementBuilder;
+import com.baidu.hugegraph.loader.constant.Constants;
+import com.baidu.hugegraph.loader.direct.util.SinkToHBase;
+import com.baidu.hugegraph.loader.executor.LoadOptions;
+import com.baidu.hugegraph.loader.mapping.ElementMapping;
+import com.baidu.hugegraph.loader.mapping.InputStruct;
+import com.baidu.hugegraph.loader.metrics.LoadDistributeMetrics;
+import com.baidu.hugegraph.loader.util.HugeClientHolder;
+import com.baidu.hugegraph.serializer.direct.HBaseSerializer;
+import com.baidu.hugegraph.structure.GraphElement;
+import com.baidu.hugegraph.structure.graph.Edge;
+import com.baidu.hugegraph.structure.graph.Vertex;
+
+import org.apache.hugegraph.util.Log;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FsShell;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.spark.Partitioner;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.slf4j.Logger;
+
+import scala.Tuple2;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+public class HBaseDirectLoader extends DirectLoader<ImmutableBytesWritable, KeyValue> {
+
+    private SinkToHBase sinkToHBase;
+    private LoadDistributeMetrics loadDistributeMetrics;
+
+    public static final Logger LOG = Log.logger(HBaseDirectLoader.class);
+
+    public HBaseDirectLoader(LoadOptions loadOptions,
+                             InputStruct struct,
+                             LoadDistributeMetrics loadDistributeMetrics) {
+        super(loadOptions,struct);
+        this.loadDistributeMetrics = loadDistributeMetrics;
+        this.sinkToHBase = new SinkToHBase(loadOptions);
+
+    }
+
+    public String getTableName() {
+
+        String tableName = null;
+        if (struct.edges().size() > 0) {
+            tableName = this.loadOptions.edgeTablename;
+
+        } else if (struct.vertices().size() > 0) {
+            tableName = this.loadOptions.vertexTablename;
+
+        }
+        return tableName;
+    }
+
+    public Integer getTablePartitions () {
+        return struct.edges().size() > 0 ?
+               loadOptions.edgePartitions :
+               loadOptions.vertexPartitions;
+    }
+
+    public JavaPairRDD<ImmutableBytesWritable, KeyValue> buildVertexAndEdge(Dataset<Row> ds) {
+        LOG.info("Start build vertexs and edges");
+        JavaPairRDD<ImmutableBytesWritable, KeyValue> tuple2KeyValueJavaPairRDD =
+                ds.toJavaRDD().mapPartitionsToPair(
+                new PairFlatMapFunction<Iterator<Row>, ImmutableBytesWritable, KeyValue>() {
+                    @Override
+                    public Iterator<Tuple2<ImmutableBytesWritable, KeyValue>> call(
+                           Iterator<Row> rowIterator) throws Exception {
+
+                            HBaseSerializer serializer = new HBaseSerializer(
+                                    HugeClientHolder.create(loadOptions),
+                                    loadOptions.vertexPartitions,loadOptions.edgePartitions);
+                            List<ElementBuilder> buildersForGraphElement = getElementBuilders();
+                            List<Tuple2<ImmutableBytesWritable,  KeyValue>> result =
+                                    new LinkedList<>();
+                            while (rowIterator.hasNext()) {
+                                Row row = rowIterator.next();
+                                List<Tuple2<ImmutableBytesWritable, KeyValue>> serList =
+                                        buildAndSer(serializer, row,buildersForGraphElement);
+                                result.addAll(serList);
+                            }
+                                serializer.close();
+                                return result.iterator();
+                    }
+                }
+        );
+        return tuple2KeyValueJavaPairRDD;
+    }
+
+    @Override
+    String generateFiles(JavaPairRDD<ImmutableBytesWritable, KeyValue> buildAndSerRdd) {
+        LOG.info("Start to generate hfile");
+        try {
+            Tuple2<SinkToHBase.IntPartitioner, TableDescriptor> tuple =
+                    sinkToHBase.getPartitionerByTableName(getTablePartitions(), getTableName());
+            Partitioner partitioner = (Partitioner) tuple._1;
+            TableDescriptor tableDescriptor = (TableDescriptor) tuple._2;
+
+            JavaPairRDD<ImmutableBytesWritable, KeyValue> repartitionedRdd =
+                    buildAndSerRdd.repartitionAndSortWithinPartitions(partitioner);
+            Configuration conf = sinkToHBase.getHBaseConfiguration().get();
+            Job job = Job.getInstance(conf);
+            HFileOutputFormat2.configureIncrementalLoadMap(job, tableDescriptor);
+            conf.set("hbase.mapreduce.hfileoutputformat.table.name",
+                    tableDescriptor.getTableName().getNameAsString());
+            String path = getHFilePath(job.getConfiguration());
+            repartitionedRdd.saveAsNewAPIHadoopFile(
+                    path,
+                    ImmutableBytesWritable.class,
+                    KeyValue.class,
+                    HFileOutputFormat2.class,
+                    conf
+            );
+            LOG.info("Saved HFiles to: '{}'", path);
+            flushPermission(conf,path);
+            return path;
+        } catch (IOException e) {
+            LOG.error("Failed to generate files", e);
+        }
+        return Constants.EMPTY_STR;
+    }
+
+    public String   getHFilePath (Configuration conf) throws IOException {
+        FileSystem fs = FileSystem.get(conf);
+        long timeStr = System.currentTimeMillis();
+        String pathStr = fs.getWorkingDirectory().toString() + "/hfile-gen" + "/" + timeStr + "/";
+        Path hfileGenPath = new Path(pathStr);
+        if (fs.exists(hfileGenPath)) {
+            LOG.info("\n Delete the path where the hfile is generated,path {} ", pathStr);
+            fs.delete(hfileGenPath,true);
+        }
+        return pathStr;
+    }
+
+    @Override
+    public void loadFiles(String path) {
+        try {
+            sinkToHBase.loadHfiles(path, getTableName());// BulkLoad HFile to HBase
+        } catch (Exception e) {
+            LOG.error(" Failed to load hfiles", e);
+        }
+    }
+
+    private void flushPermission (Configuration conf, String path) {
+        FsShell shell = new FsShell(conf);
+        try {
+            LOG.info("Chmod hfile directory permission");
+            shell.run(new String[]{"-chmod", "-R", "777", path});
+            shell.close();
+        } catch (Exception e) {
+            LOG.error("Couldnt change the file permissions " + e +
+                    " Please run command:" +
+                    "hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles " + path +
+                    " '" + "test" + "'\n" + " to load generated HFiles into HBase table");
+        }
+    }
+
+    List<Tuple2<ImmutableBytesWritable,  KeyValue>> buildAndSer (HBaseSerializer serializer,
+                                                                 Row row,
+                                                                 List<ElementBuilder> builders) {
+        List<GraphElement> elementsElement;
+
+        List<Tuple2<ImmutableBytesWritable,  KeyValue>> result = new LinkedList<>();
+
+        for (ElementBuilder builder : builders) {
+            ElementMapping elementMapping = builder.mapping();
+            if (elementMapping.skip()) {
+                continue;
+            }
+            if ("".equals(row.mkString())) {
+                break;
+            }
+            switch (struct.input().type()) {
+                case FILE:
+                case HDFS:
+                    elementsElement = builder.build(row);
+                    break;
+                default:
+                    throw new AssertionError(String.format(
+                            "Unsupported input source '%s'",
+                            struct.input().type()));
+            }
+
+            boolean isVertex = builder.mapping().type().isVertex();
+            if (isVertex) {
+                for (Vertex vertex : (List<Vertex>) (Object) elementsElement) {
+                    LOG.debug("vertex already build done {} ", vertex.toString());
+                    Tuple2<ImmutableBytesWritable, KeyValue> tuple2 =
+                            vertexSerialize(serializer,vertex);
+                    loadDistributeMetrics.increaseDisVertexInsertSuccess(builder.mapping());
+                    result.add(tuple2);
+                }
+            } else {
+                for (Edge edge : (List<Edge>) (Object) elementsElement) {
+                    LOG.debug("edge already build done {}", edge.toString());
+                    Tuple2<ImmutableBytesWritable, KeyValue> tuple2 =
+                            edgeSerialize(serializer,edge);
+                    loadDistributeMetrics.increaseDisEdgeInsertSuccess(builder.mapping());
+                    result.add(tuple2);
+
+                }
+            }
+        }
+        return result;
+    }
+
+    private Tuple2<ImmutableBytesWritable, KeyValue> edgeSerialize (HBaseSerializer serializer,
+                                                                    Edge edge) {
+        LOG.debug("edge start serialize {}", edge.toString());
+        byte[] rowkey = serializer.getKeyBytes(edge);
+        byte[] values = serializer.getValueBytes(edge);
+        ImmutableBytesWritable rowKey = new ImmutableBytesWritable();
+        rowKey.set(rowkey);
+        KeyValue keyValue = new KeyValue(rowkey,
+                Bytes.toBytes(Constants.HBASE_COL_FAMILY),
+                Bytes.toBytes(Constants.EMPTY_STR),
+                values);
+        return new Tuple2<>(rowKey,keyValue);
+    }
+
+    private Tuple2<ImmutableBytesWritable, KeyValue> vertexSerialize (HBaseSerializer serializer,
+                                                                      Vertex vertex) {
+        LOG.debug("vertex start serialize {}", vertex.toString());
+        byte[] rowkey = serializer.getKeyBytes(vertex);
+        byte[] values = serializer.getValueBytes(vertex);
+        ImmutableBytesWritable rowKey = new ImmutableBytesWritable();
+        rowKey.set(rowkey);
+        KeyValue keyValue = new KeyValue(rowkey,
+                Bytes.toBytes(Constants.HBASE_COL_FAMILY),
+                Bytes.toBytes(Constants.EMPTY_STR),
+                values);
+        return new Tuple2<>(rowKey,keyValue);
+    }
+}
diff --git a/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/direct/util/SinkToHBase.java b/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/direct/util/SinkToHBase.java
new file mode 100644
index 00000000..99ea1d78
--- /dev/null
+++ b/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/direct/util/SinkToHBase.java
@@ -0,0 +1,165 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.loader.direct.util;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.HRegionLocator;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.tool.BulkLoadHFilesTool;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hugegraph.util.Log;
+import org.apache.spark.Partitioner;
+import org.slf4j.Logger;
+
+import com.baidu.hugegraph.loader.executor.LoadOptions;
+
+import scala.Tuple2;
+
+public class SinkToHBase implements Serializable {
+
+    private LoadOptions loadOptions;
+    public static final Logger LOG = Log.logger(SinkToHBase.class);
+
+    public SinkToHBase(LoadOptions loadOptions) {
+        this.loadOptions = loadOptions;
+    }
+
+    public Optional<Configuration> getHBaseConfiguration() {
+        Configuration baseConf = HBaseConfiguration.create();
+        baseConf.set("hbase.zookeeper.quorum", this.loadOptions.hbaseZKQuorum);
+        baseConf.set("hbase.zookeeper.property.clientPort", this.loadOptions.hbaseZKPort);
+        baseConf.set("zookeeper.znode.parent", this.loadOptions.hbaseZKParent);
+        return Optional.ofNullable(baseConf);
+    }
+
+    private Optional<Connection> getConnection() {
+        Optional<Configuration> baseConf = getHBaseConfiguration();
+        Connection conn = null;
+        try {
+            conn = ConnectionFactory.createConnection(baseConf.get());
+        } catch (IOException e) {
+            LOG.error("get hbase connection failed",e);
+        }
+        return Optional.ofNullable(conn);
+    }
+
+    public Tuple2<IntPartitioner, TableDescriptor>
+            getPartitionerByTableName (int numPartitions, String tableName) throws IOException {
+        Optional<Connection> optionalConnection = getConnection();
+        TableDescriptor descriptor = optionalConnection
+                .get()
+                .getTable(TableName.valueOf(tableName))
+                .getDescriptor();
+        LOG.debug("getPartitionerByTableName get TableDescriptor " +
+                descriptor.getTableName());
+        optionalConnection.get().close();
+        return new Tuple2<IntPartitioner,TableDescriptor>(
+                new IntPartitioner(numPartitions, tableName),descriptor);
+    }
+
+    public void loadHfiles(String path, String tableName) throws Exception {
+        Connection conn = getConnection().get();
+        Table table = conn.getTable(TableName.valueOf(tableName));
+        Configuration conf = conn.getConfiguration();
+        BulkLoadHFilesTool bulkLoadHFilesTool = new BulkLoadHFilesTool(conf);
+        bulkLoadHFilesTool.bulkLoad(table.getName(), new Path(path));
+        table.close();
+        conn.close();
+    }
+
+    public class IntPartitioner extends Partitioner {
+
+        private final int numPartitions;
+        public Map<List<String>, Integer> rangeMap = new HashMap<>();
+        private String tableName;
+
+        public IntPartitioner(int numPartitions, String tableName) throws IOException {
+            this.numPartitions = numPartitions;
+            this.rangeMap = getRangeMap(tableName);
+            this.tableName = tableName;
+        }
+
+        private Map<List<String>, Integer> getRangeMap(String tableName) throws IOException {
+            Connection conn = getConnection().get();
+            HRegionLocator locator =
+                    (HRegionLocator) conn.getRegionLocator(TableName.valueOf(tableName));
+            Pair<byte[][], byte[][]> startEndKeys = locator.getStartEndKeys();
+            Map<List<String>, Integer> rangeMap = new HashMap<>();
+            for (int i = 0; i < startEndKeys.getFirst().length; i++) {
+                String startKey = Bytes.toString(startEndKeys.getFirst()[i]);
+                String endKey = Bytes.toString(startEndKeys.getSecond()[i]);
+                rangeMap.put(new ArrayList<>(Arrays.asList(startKey, endKey)), i);
+            }
+            conn.close();
+            return rangeMap;
+        }
+
+        @Override
+        public int numPartitions() {
+            return numPartitions;
+        }
+
+        @Override
+        public int getPartition(Object key) {
+            if (key instanceof ImmutableBytesWritable) {
+                try {
+                    ImmutableBytesWritable immutableBytesWritableKey = (ImmutableBytesWritable) key;
+                    if (rangeMap == null || rangeMap.isEmpty()) {
+                        rangeMap = getRangeMap(this.tableName);
+                    }
+                    String keyString = Bytes.toString(immutableBytesWritableKey.get());
+                    for (List<String> range : rangeMap.keySet()) {
+                        if (keyString.compareToIgnoreCase(range.get(0)) >= 0 &&
+                           ((keyString.compareToIgnoreCase(range.get(1)) < 0) ||
+                           range.get(1).equals(""))) {
+                            return rangeMap.get(range);
+                        }
+                    }
+                    LOG.error("Didn't find proper key in rangeMap, so returning 0 ...");
+                    return 0;
+                } catch (Exception e) {
+                    LOG.error("When trying to get partitionID, " +
+                              "encountered exception: {} \t key = {}", e, key);
+                    return 0;
+                }
+            } else {
+                LOG.error("key is NOT ImmutableBytesWritable type ...");
+                return 0;
+            }
+        }
+    }
+}
diff --git a/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/executor/LoadOptions.java b/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/executor/LoadOptions.java
index c90b1d74..118b9737 100644
--- a/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/executor/LoadOptions.java
+++ b/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/executor/LoadOptions.java
@@ -23,6 +23,7 @@ import java.io.File;
 import java.io.Serializable;
 import java.util.Set;
 
+import com.baidu.hugegraph.loader.mapping.BackendStoreInfo;
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 
@@ -206,6 +207,34 @@ public class LoadOptions implements Serializable {
                description = "Print usage of HugeGraphLoader")
     public boolean help;
 
+    @Parameter(names = {"--sink-type"}, arity = 1,
+               description = "Sink to different storage")
+    public boolean sinkType = true;
+
+    @Parameter(names = {"--edge-partitions"}, arity = 1,
+               description = "The number of partitions of the HBase edge table")
+    public int edgePartitions = 64;
+
+    @Parameter(names = {"--vertex-partitions"}, arity = 1,
+               description = "The number of partitions of the HBase vertex table")
+    public int vertexPartitions = 64;
+
+    @Parameter(names = {"edgeTablename"}, arity = 1,
+               description = "edgeTablename")
+    public String edgeTablename;
+    @Parameter(names = {"vertexTablename"}, arity = 1,
+               description = "vertexTablename")
+    public String vertexTablename;
+    @Parameter(names = {"hbaseZKQuorum"}, arity = 1,
+               description = "hbaseZKQuorum")
+    public String hbaseZKQuorum;
+    @Parameter(names = {"hbaseZKPort"}, arity = 1,
+               description = "hbaseZKPort")
+    public String hbaseZKPort;
+    @Parameter(names = {"hbaseZKParent"}, arity = 1,
+               description = "hbaseZKParent")
+    public String hbaseZKParent;
+
     public String workModeString() {
         if (this.incrementalMode) {
             return "INCREMENTAL MODE";
@@ -263,6 +292,15 @@ public class LoadOptions implements Serializable {
         return options;
     }
 
+    public void copyBackendStoreInfo (BackendStoreInfo backendStoreInfo) {
+        E.checkArgument(null != backendStoreInfo, "The backendStoreInfo can't be null");
+        this.edgeTablename = backendStoreInfo.getEdgeTablename();
+        this.vertexTablename = backendStoreInfo.getVertexTablename();
+        this.hbaseZKParent = backendStoreInfo.getHbaseZKParent();
+        this.hbaseZKPort = backendStoreInfo.getHbaseZKPort();
+        this.hbaseZKQuorum = backendStoreInfo.getHbaseZKQuorum();
+    }
+
     public static class UrlValidator implements IParameterValidator {
 
         @Override
diff --git a/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/mapping/BackendStoreInfo.java b/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/mapping/BackendStoreInfo.java
new file mode 100644
index 00000000..1b533262
--- /dev/null
+++ b/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/mapping/BackendStoreInfo.java
@@ -0,0 +1,83 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.loader.mapping;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonPropertyOrder;
+
+@JsonPropertyOrder({"edge_tablename", "vertex_tablename", "hbase_zookeeper_quorum",
+                    "hbase_zookeeper_property_clientPort", "zookeeper_znode_parent"})
+public class BackendStoreInfo {
+
+    @JsonProperty("edge_tablename")
+    private String edgeTablename;
+    @JsonProperty("vertex_tablename")
+    private String vertexTablename;
+    @JsonProperty("hbase_zookeeper_quorum")
+    private String hbaseZKQuorum;
+    @JsonProperty("hbase_zookeeper_property_clientPort")
+    private String hbaseZKPort;
+    @JsonProperty("zookeeper_znode_parent")
+    private String hbaseZKParent;
+
+    public String getEdgeTablename() {
+        return edgeTablename;
+    }
+
+    public void setEdgeTablename(String edgeTablename) {
+        this.edgeTablename = edgeTablename;
+    }
+
+    public String isVertexTablename() {
+        return vertexTablename;
+    }
+
+    public void setVertexTablename(String vertexTablename) {
+        this.vertexTablename = vertexTablename;
+    }
+
+    public String getVertexTablename() {
+        return vertexTablename;
+    }
+
+    public String getHbaseZKQuorum() {
+        return hbaseZKQuorum;
+    }
+
+    public void setHbaseZKQuorum(String hbaseZKQuorum) {
+        this.hbaseZKQuorum = hbaseZKQuorum;
+    }
+
+    public String getHbaseZKPort() {
+        return hbaseZKPort;
+    }
+
+    public void setHbaseZKPort(String hbaseZKPort) {
+        this.hbaseZKPort = hbaseZKPort;
+    }
+
+    public String getHbaseZKParent() {
+        return hbaseZKParent;
+    }
+
+    public void setHbaseZKParent(String hbaseZKParent) {
+        this.hbaseZKParent = hbaseZKParent;
+    }
+}
diff --git a/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/mapping/LoadMapping.java b/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/mapping/LoadMapping.java
index 48f5beca..c3afd736 100644
--- a/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/mapping/LoadMapping.java
+++ b/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/mapping/LoadMapping.java
@@ -53,6 +53,12 @@ public class LoadMapping implements Checkable {
     private String version;
     @JsonProperty("structs")
     private List<InputStruct> structs;
+    @JsonProperty("backendStoreInfo")
+    private BackendStoreInfo backendStoreInfo;
+
+    public BackendStoreInfo getBackendStoreInfo() {
+        return backendStoreInfo;
+    }
 
     public static LoadMapping of(String filePath) {
         File file = FileUtils.getFile(filePath);
@@ -81,6 +87,14 @@ public class LoadMapping implements Checkable {
         this.structs = structs;
     }
 
+    @JsonCreator
+    public LoadMapping(@JsonProperty("structs") List<InputStruct> structs,
+                       @JsonProperty("backendStoreInfo") BackendStoreInfo backendStoreInfo) {
+        this.version = Constants.V2_STRUCT_VERSION;
+        this.structs = structs;
+        this.backendStoreInfo = backendStoreInfo;
+    }
+
     @Override
     public void check() throws IllegalArgumentException {
         E.checkArgument(!StringUtils.isEmpty(this.version),
diff --git a/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/metrics/LoadDistributeMetrics.java b/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/metrics/LoadDistributeMetrics.java
new file mode 100644
index 00000000..3345cd16
--- /dev/null
+++ b/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/metrics/LoadDistributeMetrics.java
@@ -0,0 +1,154 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.loader.metrics;
+
+import com.baidu.hugegraph.loader.constant.Constants;
+import com.baidu.hugegraph.loader.mapping.EdgeMapping;
+import com.baidu.hugegraph.loader.mapping.ElementMapping;
+import com.baidu.hugegraph.loader.mapping.InputStruct;
+import com.baidu.hugegraph.loader.mapping.VertexMapping;
+import org.apache.spark.SparkContext;
+import org.apache.spark.util.LongAccumulator;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+public final class LoadDistributeMetrics implements Serializable {
+
+    private final InputStruct struct;
+    private   Map<String, Metrics> vertexDisMetrics;
+    private   Map<String, Metrics> edgeDisMetrics;
+
+    public LoadDistributeMetrics(InputStruct struct) {
+        this.struct = struct;
+        this.vertexDisMetrics = new HashMap<>();
+        this.edgeDisMetrics = new HashMap<>();
+        for (VertexMapping mapping : struct.vertices()) {
+            this.vertexDisMetrics.put(mapping.label(), new Metrics());
+        }
+        for (EdgeMapping mapping : struct.edges()) {
+            this.edgeDisMetrics.put(mapping.label(), new Metrics());
+        }
+    }
+
+    public void init(SparkContext sc) {
+        for (VertexMapping mapping : this.struct.vertices()) {
+            Metrics metrics = this.vertexDisMetrics.get(mapping.label());
+            metrics.insertSuccess = sc.longAccumulator(mapping.label() +
+                    Constants.UNDERLINE_STR + Constants.LOAD_DATA_INSERT_SUFFIX);
+            metrics.parseSuccess = sc.longAccumulator(mapping.label() +
+                    Constants.UNDERLINE_STR + Constants.LOAD_DATA_PARSE_SUFFIX);
+        }
+        for (EdgeMapping mapping : this.struct.edges()) {
+            Metrics metrics = this.edgeDisMetrics.get(mapping.label());
+            metrics.insertSuccess = sc.longAccumulator(mapping.label() +
+                    Constants.UNDERLINE_STR + Constants.LOAD_DATA_INSERT_SUFFIX);
+            metrics.parseSuccess = sc.longAccumulator(mapping.label() +
+                    Constants.UNDERLINE_STR + Constants.LOAD_DATA_PARSE_SUFFIX);
+        }
+    }
+
+    public void increaseDisVertexParseSuccess(ElementMapping mapping) {
+        this.disMetrics(mapping).parseSuccess.add(1);
+    }
+
+    public void pluseDisVertexParseSuccess(ElementMapping mapping, Long count) {
+        this.disMetrics(mapping).parseSuccess.add(count);
+    }
+
+    public void increaseDisVertexInsertSuccess(ElementMapping mapping) {
+        this.disMetrics(mapping).insertSuccess.add(1);
+    }
+
+    public void plusDisVertexInsertSuccess(ElementMapping mapping, Long count) {
+        this.disMetrics(mapping).insertSuccess.add(count);
+    }
+
+    public void increaseDisEdgeParseSuccess(ElementMapping mapping) {
+        this.disMetrics(mapping).parseSuccess.add(1);
+    }
+
+    public void pluseDisEdgeParseSuccess(ElementMapping mapping, Long count) {
+        this.disMetrics(mapping).parseSuccess.add(count);
+    }
+
+    public void increaseDisEdgeInsertSuccess(ElementMapping mapping) {
+        this.disMetrics(mapping).insertSuccess.add(1);
+    }
+
+    public void plusDisEdgeInsertSuccess(ElementMapping mapping, Long count) {
+        this.disMetrics(mapping).insertSuccess.add(count);
+    }
+
+    public Long readVertexInsertSuccess() {
+        Long totalCnt = 0L;
+        Collection<Metrics> values = vertexDisMetrics.values();
+        for (Metrics metrics : values) {
+            totalCnt += metrics.insertSuccess();
+        }
+        return totalCnt;
+    }
+
+    public Long readEdgeInsertSuccess() {
+        Long totalCnt = 0L;
+        Collection<Metrics> values = edgeDisMetrics.values();
+        for (Metrics metrics : values) {
+            totalCnt += metrics.insertSuccess();
+        }
+        return totalCnt;
+    }
+
+    private Metrics disMetrics(ElementMapping mapping) {
+        if (mapping.type().isVertex()) {
+            return this.vertexDisMetrics.get(mapping.label());
+        } else {
+            return this.edgeDisMetrics.get(mapping.label());
+        }
+    }
+
+    public static class Metrics implements Serializable {
+
+        private LongAccumulator parseSuccess;
+        private LongAccumulator parseFailure;
+        private LongAccumulator insertSuccess;
+        private LongAccumulator insertFailure;
+
+        public Metrics() {
+        }
+
+        public long parseSuccess() {
+            return this.parseSuccess.value();
+        }
+
+        public long parseFailure() {
+            return this.parseFailure.value();
+        }
+
+        public long insertSuccess() {
+            return this.insertSuccess.value();
+        }
+
+        public long insertFailure() {
+            return this.insertFailure.value();
+        }
+    }
+}
diff --git a/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/spark/HugeGraphSparkLoader.java b/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/spark/HugeGraphSparkLoader.java
index ae821303..acb188ae 100644
--- a/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/spark/HugeGraphSparkLoader.java
+++ b/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/spark/HugeGraphSparkLoader.java
@@ -23,6 +23,7 @@ import com.baidu.hugegraph.driver.GraphManager;
 import com.baidu.hugegraph.loader.builder.EdgeBuilder;
 import com.baidu.hugegraph.loader.builder.ElementBuilder;
 import com.baidu.hugegraph.loader.builder.VertexBuilder;
+import com.baidu.hugegraph.loader.direct.loader.HBaseDirectLoader;
 import com.baidu.hugegraph.loader.executor.LoadContext;
 import com.baidu.hugegraph.loader.executor.LoadOptions;
 import com.baidu.hugegraph.loader.mapping.EdgeMapping;
@@ -30,6 +31,7 @@ import com.baidu.hugegraph.loader.mapping.ElementMapping;
 import com.baidu.hugegraph.loader.mapping.InputStruct;
 import com.baidu.hugegraph.loader.mapping.LoadMapping;
 import com.baidu.hugegraph.loader.mapping.VertexMapping;
+import com.baidu.hugegraph.loader.metrics.LoadDistributeMetrics;
 import com.baidu.hugegraph.loader.source.InputSource;
 import com.baidu.hugegraph.loader.source.file.Compression;
 import com.baidu.hugegraph.loader.source.file.FileFilter;
@@ -46,11 +48,14 @@ import com.baidu.hugegraph.structure.graph.UpdateStrategy;
 import com.baidu.hugegraph.structure.graph.Vertex;
 import org.apache.hugegraph.util.Log;
 
+import org.apache.spark.SparkConf;
+import org.apache.spark.SparkContext;
 import org.apache.spark.sql.DataFrameReader;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.types.StructField;
+import org.apache.spark.util.LongAccumulator;
 import org.slf4j.Logger;
 
 import java.io.Serializable;
@@ -89,29 +94,92 @@ public class HugeGraphSparkLoader implements Serializable {
     public void load() {
         LoadMapping mapping = LoadMapping.of(this.loadOptions.file);
         List<InputStruct> structs = mapping.structs();
+        boolean sinkType = this.loadOptions.sinkType;
+        if(!sinkType) {
+            this.loadOptions.copyBackendStoreInfo(mapping.getBackendStoreInfo());
+        }
+        SparkConf conf = new SparkConf()
+                .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")// kryo序列化
+                .set("spark.kryo.registrationRequired", "true");
+        try {
+            conf.registerKryoClasses(new Class[] {
+                    org.apache.hadoop.hbase.io.ImmutableBytesWritable.class,
+                    org.apache.hadoop.hbase.KeyValue.class,
+                    org.apache.spark.sql.types.StructType.class,
+                    StructField[].class,
+                    StructField.class,
+                    org.apache.spark.sql.types.LongType$.class,
+                    org.apache.spark.sql.types.Metadata.class,
+                    org.apache.spark.sql.types.StringType$.class,
+                    Class.forName("org.apache.spark.internal.io.FileCommitProtocol$TaskCommitMessage"),
+                    Class.forName("scala.reflect.ClassTag$$anon$1"),
+                    Class.forName("scala.collection.immutable.Set$EmptySet$"),
+                    Class.forName("org.apache.spark.sql.types.DoubleType$")
+                 });
+        } catch (ClassNotFoundException e) {
+            LOG.error("spark kryo serialized registration failed");
+        }
+        SparkSession session = SparkSession.builder()
+                                           .config(conf)
+                                           .getOrCreate();
+        SparkContext sc = session.sparkContext();
 
-        SparkSession session = SparkSession.builder().getOrCreate();
+        LongAccumulator totalInsertSuccess = sc.longAccumulator("totalInsertSuccess");
         for (InputStruct struct : structs) {
+            LOG.info("\n Initializes the accumulator corresponding to the  {} ",
+                     struct.input().asFileSource().path());
+            LoadDistributeMetrics loadDistributeMetrics = new LoadDistributeMetrics(struct);
+            loadDistributeMetrics.init(sc);
+            LOG.info("\n  Start to load data, data info is: \t {} ",
+                     struct.input().asFileSource().path());
             Dataset<Row> ds = read(session, struct);
-            ds.foreachPartition((Iterator<Row> p) -> {
-                LoadContext context = initPartition(this.loadOptions, struct);
-                p.forEachRemaining((Row row) -> {
-                    loadRow(struct, row, p, context);
+            if (sinkType) {
+                LOG.info("\n  Start to load data using spark apis  \n");
+                ds.foreachPartition((Iterator<Row> p) -> {
+                    LoadContext context = initPartition(this.loadOptions, struct);
+                    p.forEachRemaining((Row row) -> {
+                        loadRow(struct, row, p, context);
+                    });
+                    context.close();
                 });
-                context.close();
-            });
+
+            } else {
+                LOG.info("\n Start to load data using spark bulkload     \n");
+                // gen-hfile
+                HBaseDirectLoader directLoader = new HBaseDirectLoader(loadOptions,
+                        struct,loadDistributeMetrics);
+                directLoader.bulkload(ds);
+
+            }
+            collectLoadMetrics(loadDistributeMetrics,totalInsertSuccess);
+            LOG.info("\n Finished  load {}  data ",
+                     struct.input().asFileSource().path());
         }
+        Long totalInsertSuccessCnt = totalInsertSuccess.value();
+        LOG.info("\n ------------The data load task is complete-------------------\n" +
+                 "\n  insertSuccesscnt:\t {}" +
+                 "\n ---------------------------------------------\n"
+                 , totalInsertSuccessCnt);
+
+        sc.stop();
         session.close();
         session.stop();
     }
 
+    private void collectLoadMetrics(LoadDistributeMetrics loadMetrics,
+                                    LongAccumulator totalInsertSuccess) {
+        Long edgeInsertSuccess = loadMetrics.readEdgeInsertSuccess();
+        Long vertexInsertSuccess = loadMetrics.readVertexInsertSuccess();
+        totalInsertSuccess.add(edgeInsertSuccess);
+        totalInsertSuccess.add(vertexInsertSuccess);
+    }
+
     private LoadContext initPartition(
             LoadOptions loadOptions, InputStruct struct) {
         LoadContext context = new LoadContext(loadOptions);
         for (VertexMapping vertexMapping : struct.vertices()) {
-            this.builders.put(
-                    new VertexBuilder(context, struct, vertexMapping),
-                    new ArrayList<>());
+            this.builders.put(new VertexBuilder(context, struct, vertexMapping),
+                              new ArrayList<>());
         }
         for (EdgeMapping edgeMapping : struct.edges()) {
             this.builders.put(new EdgeBuilder(context, struct, edgeMapping),
diff --git a/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/struct/GraphStructV1.java b/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/struct/GraphStructV1.java
index f38e4b20..1b9af294 100644
--- a/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/struct/GraphStructV1.java
+++ b/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/struct/GraphStructV1.java
@@ -26,6 +26,7 @@ import java.util.List;
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import com.baidu.hugegraph.loader.mapping.BackendStoreInfo;
 import org.apache.commons.collections.ListUtils;
 import org.apache.commons.io.FileUtils;
 import org.slf4j.Logger;
@@ -56,6 +57,9 @@ public class GraphStructV1 implements Checkable {
     @JsonProperty("edges")
     private final List<EdgeStructV1> edgeStructs;
 
+    @JsonProperty("backendStoreInfo")
+    private BackendStoreInfo backendStoreInfo;
+
     public GraphStructV1() {
         this.vertexStructs = new ArrayList<>();
         this.edgeStructs = new ArrayList<>();
@@ -76,6 +80,10 @@ public class GraphStructV1 implements Checkable {
         }
     }
 
+    public BackendStoreInfo getBackendStoreInfo() {
+        return backendStoreInfo;
+    }
+
     @Override
     public void check() throws IllegalArgumentException {
         LOG.info("Checking vertex mapping descriptions");
diff --git a/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/util/MappingUtil.java b/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/util/MappingUtil.java
index b43d8b55..a9667806 100644
--- a/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/util/MappingUtil.java
+++ b/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/util/MappingUtil.java
@@ -129,7 +129,7 @@ public final class MappingUtil {
             inputStruct.id(String.valueOf(++id));
             inputStructs.add(inputStruct);
         }
-        return new LoadMapping(inputStructs);
+        return new LoadMapping(inputStructs, graphStruct.getBackendStoreInfo());
     }
 
     private static ElementMapping convertV1ToV2(ElementStructV1 origin) {
diff --git a/hugegraph-loader/src/test/java/com/baidu/hugegraph/loader/test/unit/MappingConverterTest.java b/hugegraph-loader/src/test/java/com/baidu/hugegraph/loader/test/unit/MappingConverterTest.java
index c937daed..c2a14b01 100644
--- a/hugegraph-loader/src/test/java/com/baidu/hugegraph/loader/test/unit/MappingConverterTest.java
+++ b/hugegraph-loader/src/test/java/com/baidu/hugegraph/loader/test/unit/MappingConverterTest.java
@@ -73,7 +73,15 @@ public class MappingConverterTest {
                 "        \"Rating\": \"rate\"" +
                 "      }" +
                 "    }" +
-                "  ]" +
+                "  ]," +
+                "  \"backendStoreInfo\":" +
+                "  {" +
+                "    \"edge_tablename\": \"hugegraph:g_oe\"," +
+                "    \"vertex_tablename\": \"hugegraph:g_v\"," +
+                "    \"hbase_zookeeper_quorum\": \"127.0.0.1\"," +
+                "    \"hbase_zookeeper_property_clientPort\": \"2181\"," +
+                "    \"zookeeper_znode_parent\": \"/hbase\"" +
+                "  }" +
                 "}";
         String input = "struct.json";
         File inputFile = new File(input);
@@ -117,7 +125,13 @@ public class MappingConverterTest {
                 "\"field_mapping\":{\"UserID\":\"id\",\"MovieID\":\"id\"," +
                 "\"Rating\":\"rate\"},\"value_mapping\":{},\"selected\":[]," +
                 "\"ignored\":[\"Timestamp\"],\"null_values\":[\"\"]," +
-                "\"update_strategies\":{},\"batch_size\":500}]}]}";
+                "\"update_strategies\":{},\"batch_size\":500}]}]," +
+                "\"backendStoreInfo\":{" +
+                "\"edge_tablename\":\"hugegraph:g_oe\"," +
+                "\"vertex_tablename\":\"hugegraph:g_v\"," +
+                "\"hbase_zookeeper_quorum\":\"127.0.0.1\"," +
+                "\"hbase_zookeeper_property_clientPort\":\"2181\"," +
+                "\"zookeeper_znode_parent\":\"/hbase\"}}";
         Assert.assertEquals(expectV2Json, actualV2Json);
 
         FileUtils.forceDelete(inputFile);
diff --git a/pom.xml b/pom.xml
index 3a003778..6e32e74f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -80,6 +80,7 @@
         <docker.hub>hugegraph</docker.hub>
         <docker.repo>${project.name}</docker.repo>
         <docker.tag>${project.version}</docker.tag>
+        <hbase.version>2.2.3</hbase.version>
     </properties>
 
     <dependencies>