You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hugegraph.apache.org by ji...@apache.org on 2022/11/10 08:52:01 UTC

[incubator-hugegraph-toolchain] branch client-rename updated (964ac5b1 -> d0b4ad99)

This is an automated email from the ASF dual-hosted git repository.

jin pushed a change to branch client-rename
in repository https://gitbox.apache.org/repos/asf/incubator-hugegraph-toolchain.git


    from 964ac5b1 adapt to org.apache package (#364)
     add 295e25c1 chore: enable flatten while building (#356)
     add a622f986 feat(hbase):  support gen HFile for hbase v2 (BETA) (#358)
     new d0b4ad99 Merge branch 'master' into client-rename

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../loader/direct/loader/DirectLoader.java         | 18 ++++----
 .../loader/direct/loader/HBaseDirectLoader.java    | 28 ++++++-------
 .../hugegraph/loader/direct/util/SinkToHBase.java  | 14 +++----
 .../hugegraph/loader/executor/LoadOptions.java     |  2 +-
 .../hugegraph/loader/mapping/LoadMapping.java      |  2 +-
 .../loader/spark/HugeGraphSparkLoader.java         | 49 ++++++++++------------
 .../apache/hugegraph/loader/util/MappingUtil.java  |  2 +-
 .../loader/test/unit/MappingConverterTest.java     | 18 +++++++-
 pom.xml                                            |  6 +--
 9 files changed, 73 insertions(+), 66 deletions(-)
 copy hugegraph-loader/src/main/java/{org/apache => com/baidu}/hugegraph/loader/direct/loader/DirectLoader.java (82%)
 copy hugegraph-loader/src/main/java/{org/apache => com/baidu}/hugegraph/loader/direct/loader/HBaseDirectLoader.java (93%)
 copy hugegraph-loader/src/main/java/{org/apache => com/baidu}/hugegraph/loader/direct/util/SinkToHBase.java (95%)


[incubator-hugegraph-toolchain] 01/01: Merge branch 'master' into client-rename

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jin pushed a commit to branch client-rename
in repository https://gitbox.apache.org/repos/asf/incubator-hugegraph-toolchain.git

commit d0b4ad994d5cb4c54f47aaf9c07084dbc5c5498b
Merge: 964ac5b1 a622f986
Author: imbajin <ji...@apache.org>
AuthorDate: Thu Nov 10 16:51:43 2022 +0800

    Merge branch 'master' into client-rename

 .../loader/direct/loader/DirectLoader.java         |  76 ++++++
 .../loader/direct/loader/HBaseDirectLoader.java    | 268 +++++++++++++++++++++
 .../hugegraph/loader/direct/util/SinkToHBase.java  | 165 +++++++++++++
 .../hugegraph/loader/executor/LoadOptions.java     |   2 +-
 .../hugegraph/loader/mapping/LoadMapping.java      |   2 +-
 .../loader/spark/HugeGraphSparkLoader.java         |  49 ++--
 .../apache/hugegraph/loader/util/MappingUtil.java  |   2 +-
 .../loader/test/unit/MappingConverterTest.java     |  18 +-
 pom.xml                                            |   6 +-
 9 files changed, 554 insertions(+), 34 deletions(-)

diff --cc hugegraph-loader/src/main/java/org/apache/hugegraph/loader/executor/LoadOptions.java
index 0d6d5317,00000000..162fd157
mode 100644,000000..100644
--- a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/executor/LoadOptions.java
+++ b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/executor/LoadOptions.java
@@@ -1,373 -1,0 +1,373 @@@
 +/*
 + * Copyright 2017 HugeGraph Authors
 + *
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements. See the NOTICE file distributed with this
 + * work for additional information regarding copyright ownership. The ASF
 + * licenses this file to You under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance with the License.
 + * You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 + * License for the specific language governing permissions and limitations
 + * under the License.
 + */
 +
 +package org.apache.hugegraph.loader.executor;
 +
 +import java.io.File;
 +import java.io.Serializable;
 +import java.util.Set;
 +
 +import org.apache.hugegraph.loader.util.LoadUtil;
 +import org.apache.hugegraph.loader.mapping.BackendStoreInfo;
 +import org.apache.commons.lang3.StringUtils;
 +import org.slf4j.Logger;
 +
 +import org.apache.hugegraph.loader.constant.Constants;
 +import org.apache.hugegraph.util.E;
 +import org.apache.hugegraph.util.Log;
 +import com.beust.jcommander.IParameterValidator;
 +import com.beust.jcommander.JCommander;
 +import com.beust.jcommander.Parameter;
 +import com.beust.jcommander.ParameterException;
 +import com.google.common.collect.ImmutableSet;
 +
 +public class LoadOptions implements Serializable {
 +
 +    private static final Logger LOG = Log.logger(LoadOptions.class);
 +
 +    public static final String HTTPS_SCHEMA = "https";
 +    public static final String HTTP_SCHEMA = "http";
 +    private static final int CPUS = Runtime.getRuntime().availableProcessors();
 +
 +    @Parameter(names = {"-f", "--file"}, required = true, arity = 1,
 +               validateWith = {FileValidator.class},
 +               description = "The path of the data mapping description file")
 +    public String file;
 +
 +    @Parameter(names = {"-s", "--schema"}, arity = 1,
 +               validateWith = {FileValidator.class},
 +               description = "The schema file path which to create manually")
 +    public String schema;
 +
 +    @Parameter(names = {"-g", "--graph"}, required = true, arity = 1,
 +               description = "The namespace of the graph to load into")
 +    public String graph;
 +
 +    @Parameter(names = {"-h", "--host"}, arity = 1,
 +               validateWith = {UrlValidator.class},
 +               description = "The host/IP of HugeGraphServer")
 +    public String host = "localhost";
 +
 +    @Parameter(names = {"-p", "--port"}, arity = 1,
 +               validateWith = {PositiveValidator.class},
 +               description = "The port of HugeGraphServer")
 +    public int port = 8080;
 +
 +    @Parameter(names = {"--username"}, arity = 1,
 +               description = "The username of graph for authentication")
 +    public String username = null;
 +
 +    @Parameter(names = {"--protocol"}, arity = 1,
 +               validateWith = {ProtocolValidator.class},
 +               description = "The protocol of HugeGraphServer, " +
 +                             "allowed values are: http or https")
 +    public String protocol = "http";
 +
 +    @Parameter(names = {"--trust-store-file"}, arity = 1,
 +               description = "The path of client truststore file used " +
 +                             "when https protocol is enabled")
 +    public String trustStoreFile = null;
 +
 +    @Parameter(names = {"--trust-store-password"}, arity = 1,
 +               description = "The password of client truststore file used " +
 +                             "when https protocol is enabled")
 +    public String trustStoreToken = null;
 +
 +    @Parameter(names = {"--token"}, arity = 1,
 +               description = "The token of graph for authentication")
 +    public String token = null;
 +
 +    @Parameter(names = {"--clear-all-data"}, arity = 1,
 +               description = "Whether to clear all old data before loading")
 +    public boolean clearAllData = false;
 +
 +    @Parameter(names = {"--clear-timeout"}, arity = 1,
 +               validateWith = {PositiveValidator.class},
 +               description = "The timeout waiting for clearing all data ")
 +    public int clearTimeout = 240;
 +
 +    @Parameter(names = {"--incremental-mode"}, arity = 1,
 +               description = "Load data from the breakpoint of last time")
 +    public boolean incrementalMode = false;
 +
 +    @Parameter(names = {"--failure-mode"}, arity = 1,
 +               description = "Load data from the failure records, in this " +
 +                             "mode, only full load is supported, any read " +
 +                             "or parsing errors will cause load task stop")
 +    public boolean failureMode = false;
 +
 +    @Parameter(names = {"--batch-insert-threads"}, arity = 1,
 +               validateWith = {PositiveValidator.class},
 +               description = "The number of threads to execute batch insert")
 +    public int batchInsertThreads = CPUS;
 +
 +    @Parameter(names = {"--single-insert-threads"}, arity = 1,
 +               validateWith = {PositiveValidator.class},
 +               description = "The number of threads to execute single insert")
 +    public int singleInsertThreads = 8;
 +
 +    @Parameter(names = {"--max-conn"}, arity = 1,
 +               description = "Max number of HTTP connections to server")
 +    public int maxConnections = CPUS * 4;
 +
 +    @Parameter(names = {"--max-conn-per-route"}, arity = 1,
 +               description = "Max number of HTTP connections to each route")
 +    public int maxConnectionsPerRoute = CPUS * 2;
 +
 +    @Parameter(names = {"--batch-size"}, arity = 1,
 +               validateWith = {PositiveValidator.class},
 +               description = "The number of lines in each submit")
 +    public int batchSize = 500;
 +
 +    @Parameter(names = {"--cdc-flush-interval"}, arity = 1,
 +               description = "The flush interval for flink cdc")
 +    public int flushIntervalMs = 30000;
 +
 +    @Parameter(names = {"--cdc-sink-parallelism"}, arity = 1,
 +               description = "The sink parallelism for flink cdc")
 +    public int sinkParallelism = 1;
 +
 +    @Parameter(names = {"--shutdown-timeout"}, arity = 1,
 +               validateWith = {PositiveValidator.class},
 +               description = "The timeout of awaitTermination in seconds")
 +    public int shutdownTimeout = 10;
 +
 +    @Parameter(names = {"--check-vertex"}, arity = 1,
 +               description = "Check vertices exists while inserting edges")
 +    public boolean checkVertex = false;
 +
 +    @Parameter(names = {"--max-read-errors"}, arity = 1,
 +               validateWith = {PositiveValidator.class},
 +               description = "The maximum number of lines that read error " +
 +                             "before exiting")
 +    public int maxReadErrors = 1;
 +
 +    @Parameter(names = {"--max-parse-errors"}, arity = 1,
 +               validateWith = {PositiveValidator.class},
 +               description = "The maximum number of lines that parse error " +
 +                             "before exiting")
 +    public int maxParseErrors = 1;
 +
 +    @Parameter(names = {"--max-insert-errors"}, arity = 1,
 +               validateWith = {PositiveValidator.class},
 +               description = "The maximum number of lines that insert error " +
 +                             "before exiting")
 +    public int maxInsertErrors = 500;
 +
 +    @Parameter(names = {"--timeout"}, arity = 1,
 +               validateWith = {PositiveValidator.class},
 +               description = "The timeout of HugeClient request")
 +    public int timeout = 60;
 +
 +    @Parameter(names = {"--retry-times"}, arity = 1,
 +               validateWith = {PositiveValidator.class},
 +               description = "Setting the max retry times when loading timeout")
 +    public int retryTimes = 3;
 +
 +    @Parameter(names = {"--retry-interval"}, arity = 1,
 +               validateWith = {PositiveValidator.class},
 +               description = "Setting the interval time before retrying")
 +    public int retryInterval = 10;
 +
 +    @Parameter(names = {"--max-read-lines"}, arity = 1,
 +               validateWith = {PositiveValidator.class},
 +               description = "The maximum number of read lines, when reached " +
 +                             "this number, the load task will stop")
 +    public long maxReadLines = -1L;
 +
 +    @Parameter(names = {"--dry-run"}, arity = 1,
 +               description = "Dry run means that only parse but doesn't load")
 +    public boolean dryRun = false;
 +
 +    @Parameter(names = {"--print-progress"}, arity = 1,
 +               description = "Whether to print real-time load progress")
 +    public boolean printProgress = true;
 +
 +    @Parameter(names = {"--test-mode"}, arity = 1,
 +               description = "Whether the hugegraph-loader work in test mode")
 +    public boolean testMode = false;
 +
 +    @Parameter(names = {"--help"}, help = true,
 +               description = "Print usage of HugeGraphLoader")
 +    public boolean help;
 +
 +    @Parameter(names = {"--sink-type"}, arity = 1,
 +               description = "Sink to different storage")
 +    public boolean sinkType = true;
 +
 +    @Parameter(names = {"--edge-partitions"}, arity = 1,
 +               description = "The number of partitions of the HBase edge table")
 +    public int edgePartitions = 64;
 +
 +    @Parameter(names = {"--vertex-partitions"}, arity = 1,
 +               description = "The number of partitions of the HBase vertex table")
 +    public int vertexPartitions = 64;
 +
 +    @Parameter(names = {"edgeTablename"}, arity = 1,
 +               description = "edgeTablename")
 +    public String edgeTablename;
 +    @Parameter(names = {"vertexTablename"}, arity = 1,
 +               description = "vertexTablename")
 +    public String vertexTablename;
 +    @Parameter(names = {"hbaseZKQuorum"}, arity = 1,
 +               description = "hbaseZKQuorum")
 +    public String hbaseZKQuorum;
 +    @Parameter(names = {"hbaseZKPort"}, arity = 1,
 +               description = "hbaseZKPort")
 +    public String hbaseZKPort;
 +    @Parameter(names = {"hbaseZKParent"}, arity = 1,
 +               description = "hbaseZKParent")
 +    public String hbaseZKParent;
 +
 +    public String workModeString() {
 +        if (this.incrementalMode) {
 +            return "INCREMENTAL MODE";
 +        } else if (this.failureMode) {
 +            return "FAILURE MODE";
 +        } else {
 +            return "NORMAL MODE";
 +        }
 +    }
 +
 +    public static LoadOptions parseOptions(String[] args) {
 +        LoadOptions options = new LoadOptions();
 +        JCommander commander = JCommander.newBuilder()
 +                                         .addObject(options)
 +                                         .build();
 +        commander.parse(args);
 +        // Print usage and exit
 +        if (options.help) {
 +            LoadUtil.exitWithUsage(commander, Constants.EXIT_CODE_NORM);
 +        }
 +        // Check options
 +        // Check option "-f"
 +        E.checkArgument(!StringUtils.isEmpty(options.file),
 +                        "The mapping file must be specified");
 +        E.checkArgument(options.file.endsWith(Constants.JSON_SUFFIX),
 +                        "The mapping file name must be end with %s",
 +                        Constants.JSON_SUFFIX);
 +        File mappingFile = new File(options.file);
 +        if (!mappingFile.canRead()) {
 +            LOG.error("The mapping file must be readable: '{}'", mappingFile);
 +            LoadUtil.exitWithUsage(commander, Constants.EXIT_CODE_ERROR);
 +        }
 +
 +        // Check option "-g"
 +        E.checkArgument(!StringUtils.isEmpty(options.graph),
 +                        "The graph must be specified");
 +        // Check option "-h"
 +        if (!options.host.startsWith(Constants.HTTP_PREFIX)) {
 +            if (options.protocol.equals(HTTP_SCHEMA)) {
 +                options.host = Constants.HTTP_PREFIX + options.host;
 +            } else {
 +                options.host = Constants.HTTPS_PREFIX + options.host;
 +            }
 +        }
 +        // Check option --incremental-mode and --failure-mode
 +        E.checkArgument(!(options.incrementalMode && options.failureMode),
 +                        "The option --incremental-mode and --failure-mode " +
 +                        "can't be true at same time");
 +        if (options.failureMode) {
 +            LOG.info("The failure-mode will scan the entire error file");
 +            options.maxReadErrors = Constants.NO_LIMIT;
 +            options.maxParseErrors = Constants.NO_LIMIT;
 +            options.maxInsertErrors = Constants.NO_LIMIT;
 +        }
 +        return options;
 +    }
 +
 +    public void copyBackendStoreInfo (BackendStoreInfo backendStoreInfo) {
-         E.checkArgument(null != backendStoreInfo,"the backendStoreInfo is null ");
++        E.checkArgument(null != backendStoreInfo, "The backendStoreInfo can't be null");
 +        this.edgeTablename = backendStoreInfo.getEdgeTablename();
 +        this.vertexTablename = backendStoreInfo.getVertexTablename();
 +        this.hbaseZKParent = backendStoreInfo.getHbaseZKParent();
 +        this.hbaseZKPort = backendStoreInfo.getHbaseZKPort();
 +        this.hbaseZKQuorum = backendStoreInfo.getHbaseZKQuorum();
 +    }
 +
 +    public static class UrlValidator implements IParameterValidator {
 +
 +        @Override
 +        public void validate(String name, String value) {
 +            String regex = "^((http)(s?)://)?" +
 +                           "(([0-9]{1,3}\\.){3}[0-9]{1,3}" + // IP URL
 +                           "|" +                             // Or domain name
 +                           "([0-9a-z_!~*'()-]+\\.)*[0-9a-z_!~*'()-]+)$";
 +            if (!value.matches(regex)) {
 +                throw new ParameterException(String.format(
 +                          "Invalid url value of args '%s': '%s'", name, value));
 +            }
 +        }
 +    }
 +
 +    public static class ProtocolValidator implements IParameterValidator {
 +
 +        private static final Set<String> SSL_PROTOCOL = ImmutableSet.of(
 +                HTTP_SCHEMA, HTTPS_SCHEMA
 +        );
 +
 +        @Override
 +        public void validate(String name, String value) {
 +            if (!SSL_PROTOCOL.contains(value.toLowerCase())) {
 +                throw new ParameterException(String.format(
 +                          "Invalid --protocol '%s', valid value is %s",
 +                          value, SSL_PROTOCOL));
 +            }
 +        }
 +    }
 +
 +    public static class DirectoryValidator implements IParameterValidator {
 +
 +        @Override
 +        public void validate(String name, String value) {
 +            File file = new File(value);
 +            if (!file.exists() || !file.isDirectory()) {
 +                throw new ParameterException(String.format(
 +                          "Ensure the directory exists and is indeed a " +
 +                          "directory instead of a file: '%s'", value));
 +            }
 +        }
 +    }
 +
 +    public static class FileValidator implements IParameterValidator {
 +
 +        @Override
 +        public void validate(String name, String value) {
 +            File file = new File(value);
 +            if (!file.exists() || !file.isFile()) {
 +                throw new ParameterException(String.format(
 +                          "Ensure the file exists and is indeed a file " +
 +                          "instead of a directory: '%s'", value));
 +            }
 +        }
 +    }
 +
 +    public static class PositiveValidator implements IParameterValidator {
 +
 +        @Override
 +        public void validate(String name, String value) {
 +            int retry = Integer.parseInt(value);
 +            if (retry <= 0) {
 +                throw new ParameterException(String.format(
 +                          "Parameter '%s' should be positive, but got '%s'",
 +                          name, value));
 +            }
 +        }
 +    }
 +}
diff --cc hugegraph-loader/src/main/java/org/apache/hugegraph/loader/mapping/LoadMapping.java
index a83fd835,00000000..7a403d25
mode 100644,000000..100644
--- a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/mapping/LoadMapping.java
+++ b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/mapping/LoadMapping.java
@@@ -1,201 -1,0 +1,201 @@@
 +/*
 + * Copyright 2017 HugeGraph Authors
 + *
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements. See the NOTICE file distributed with this
 + * work for additional information regarding copyright ownership. The ASF
 + * licenses this file to You under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance with the License.
 + * You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 + * License for the specific language governing permissions and limitations
 + * under the License.
 + */
 +
 +package org.apache.hugegraph.loader.mapping;
 +
 +import java.io.File;
 +import java.io.IOException;
 +import java.nio.file.Paths;
 +import java.util.ArrayList;
 +import java.util.LinkedHashMap;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Set;
 +import java.util.stream.Collectors;
 +
 +import org.apache.commons.collections.CollectionUtils;
 +import org.apache.commons.io.FileUtils;
 +import org.apache.commons.lang3.StringUtils;
 +
 +import org.apache.hugegraph.loader.constant.Checkable;
 +import org.apache.hugegraph.loader.constant.Constants;
 +import org.apache.hugegraph.loader.exception.LoadException;
 +import org.apache.hugegraph.loader.executor.LoadOptions;
 +import org.apache.hugegraph.loader.util.JsonUtil;
 +import org.apache.hugegraph.loader.util.LoadUtil;
 +import org.apache.hugegraph.loader.util.MappingUtil;
 +import org.apache.hugegraph.loader.source.file.FileSource;
 +import org.apache.hugegraph.util.E;
 +import com.fasterxml.jackson.annotation.JsonCreator;
 +import com.fasterxml.jackson.annotation.JsonProperty;
 +import com.fasterxml.jackson.annotation.JsonPropertyOrder;
 +
 +@JsonPropertyOrder({"version", "structs"})
 +public class LoadMapping implements Checkable {
 +
 +    @JsonProperty("version")
 +    private String version;
 +    @JsonProperty("structs")
 +    private List<InputStruct> structs;
 +    @JsonProperty("backendStoreInfo")
 +    private BackendStoreInfo backendStoreInfo;
 +
 +    public BackendStoreInfo getBackendStoreInfo() {
 +        return backendStoreInfo;
 +    }
 +
 +    public static LoadMapping of(String filePath) {
 +        File file = FileUtils.getFile(filePath);
 +        LoadMapping mapping;
 +        try {
 +            String json = FileUtils.readFileToString(file, Constants.CHARSET);
 +            mapping = MappingUtil.parse(json);
 +        } catch (IOException e) {
 +            throw new LoadException("Failed to read mapping mapping file '%s'",
 +                                    e, filePath);
 +        } catch (IllegalArgumentException e) {
 +            throw new LoadException("Failed to parse mapping mapping file '%s'",
 +                                    e, filePath);
 +        }
 +        try {
 +            mapping.check();
 +        } catch (IllegalArgumentException e) {
 +            throw new LoadException("Invalid mapping file '%s'", e, filePath);
 +        }
 +        return mapping;
 +    }
 +
 +    @JsonCreator
 +    public LoadMapping(@JsonProperty("structs") List<InputStruct> structs) {
 +        this.version = Constants.V2_STRUCT_VERSION;
 +        this.structs = structs;
 +    }
 +
 +    @JsonCreator
 +    public LoadMapping(@JsonProperty("structs") List<InputStruct> structs,
-                           @JsonProperty("backendStoreInfo") BackendStoreInfo backendStoreInfo) {
++                       @JsonProperty("backendStoreInfo") BackendStoreInfo backendStoreInfo) {
 +        this.version = Constants.V2_STRUCT_VERSION;
 +        this.structs = structs;
 +        this.backendStoreInfo = backendStoreInfo;
 +    }
 +
 +    @Override
 +    public void check() throws IllegalArgumentException {
 +        E.checkArgument(!StringUtils.isEmpty(this.version),
 +                        "The version can't be null or empty");
 +        E.checkArgument(this.version.equals(Constants.V2_STRUCT_VERSION),
 +                        "The version must be '%s', but got '%s'",
 +                        Constants.V2_STRUCT_VERSION, this.version);
 +        E.checkArgument(!CollectionUtils.isEmpty(this.structs),
 +                        "The structs can't be null or empty");
 +        this.structs.forEach(InputStruct::check);
 +        Set<String> uniqueIds = this.structs.stream().map(InputStruct::id)
 +                                            .collect(Collectors.toSet());
 +        E.checkArgument(this.structs.size() == uniqueIds.size(),
 +                        "The structs cannot contain the same id mapping");
 +    }
 +
 +    public List<InputStruct> structs() {
 +        return this.structs;
 +    }
 +
 +    public List<InputStruct> structsForFailure(LoadOptions options) {
 +        List<InputStruct> targetStructs = new ArrayList<>();
 +        String dir = LoadUtil.getStructDirPrefix(options);
 +        String path = Paths.get(dir, Constants.FAILURE_DATA).toString();
 +        File pathDir = FileUtils.getFile(path);
 +        // It means no failure data if the path directory does not exist
 +        if (!pathDir.exists()) {
 +            return targetStructs;
 +        }
 +        Map<String, FailureFile> failureFiles = this.groupFailureFiles(pathDir);
 +        for (String inputId : failureFiles.keySet()) {
 +            InputStruct struct = this.struct(inputId);
 +            String charset = struct.input().charset();
 +            FailureFile failureFile = failureFiles.get(inputId);
 +
 +            FileSource source = struct.input().asFileSource();
 +            if (failureFile.headerFile != null) {
 +                // It means that header file existed
 +                String json;
 +                try {
 +                    json = FileUtils.readFileToString(failureFile.headerFile,
 +                                                      charset);
 +                } catch (IOException e) {
 +                    throw new LoadException("Failed to read header file %s",
 +                                            failureFile.headerFile);
 +                }
 +                List<String> header = JsonUtil.convertList(json, String.class);
 +                source.header(header.toArray(new String[] {}));
 +            }
 +            // Set failure data path
 +            source.path(failureFile.dataFile.getAbsolutePath());
 +            source.skippedLine().regex(Constants.SKIPPED_LINE_REGEX);
 +            struct.input(source);
 +            // Add to target structs
 +            targetStructs.add(struct);
 +        }
 +        return targetStructs;
 +    }
 +
 +    private Map<String, FailureFile> groupFailureFiles(File pathDir) {
 +        File[] subFiles = pathDir.listFiles();
 +        E.checkArgument(subFiles != null && subFiles.length >= 1,
 +                        "Every input struct should have a failure data file, " +
 +                        "and a header file if need it");
 +        Map<String, FailureFile> failureFiles = new LinkedHashMap<>();
 +        for (File subFile : subFiles) {
 +            String inputId = LoadUtil.getFileNamePrefix(subFile);
 +            String suffix = LoadUtil.getFileNameSuffix(subFile);
 +            FailureFile failureFile = failureFiles.get(inputId);
 +            if (failureFile == null) {
 +                failureFile = new FailureFile();
 +            }
 +            if (Constants.FAILURE_SUFFIX.equals(suffix)) {
 +                failureFile.dataFile = subFile;
 +            } else {
 +                E.checkArgument(Constants.HEADER_SUFFIX.equals(suffix),
 +                                "The failure data file must end with %s or %s",
 +                                Constants.FAILURE_SUFFIX,
 +                                Constants.HEADER_SUFFIX);
 +                failureFile.headerFile = subFile;
 +            }
 +            failureFiles.put(inputId, failureFile);
 +        }
 +        return failureFiles;
 +    }
 +
 +    public InputStruct struct(String id) {
 +        for (InputStruct struct : this.structs) {
 +            if (struct.id().equals(id)) {
 +                return struct;
 +            }
 +        }
 +        throw new IllegalArgumentException(String.format(
 +                  "There is no input struct with id '%s'", id));
 +    }
 +
 +    private static class FailureFile {
 +
 +        // Maybe null
 +        private File headerFile;
 +        // Can't be null
 +        private File dataFile;
 +    }
 +}
diff --cc hugegraph-loader/src/main/java/org/apache/hugegraph/loader/spark/HugeGraphSparkLoader.java
index 5a8c2a46,00000000..0a7d1f63
mode 100644,000000..100644
--- a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/spark/HugeGraphSparkLoader.java
+++ b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/spark/HugeGraphSparkLoader.java
@@@ -1,338 -1,0 +1,335 @@@
 +/*
 + * Copyright 2017 HugeGraph Authors
 + *
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements. See the NOTICE file distributed with this
 + * work for additional information regarding copyright ownership. The ASF
 + * licenses this file to You under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance with the License.
 + * You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 + * License for the specific language governing permissions and limitations
 + * under the License.
 + */
 +
 +package org.apache.hugegraph.loader.spark;
 +
 +import org.apache.hugegraph.driver.GraphManager;
 +import org.apache.hugegraph.loader.builder.EdgeBuilder;
 +import org.apache.hugegraph.loader.builder.ElementBuilder;
 +import org.apache.hugegraph.loader.builder.VertexBuilder;
 +import org.apache.hugegraph.loader.direct.loader.HBaseDirectLoader;
 +import org.apache.hugegraph.loader.executor.LoadContext;
 +import org.apache.hugegraph.loader.executor.LoadOptions;
 +import org.apache.hugegraph.loader.metrics.LoadDistributeMetrics;
 +import org.apache.hugegraph.loader.source.InputSource;
 +import org.apache.hugegraph.loader.source.SourceType;
 +import org.apache.hugegraph.loader.source.jdbc.JDBCSource;
 +import org.apache.hugegraph.loader.util.Printer;
 +import org.apache.hugegraph.loader.mapping.EdgeMapping;
 +import org.apache.hugegraph.loader.mapping.ElementMapping;
 +import org.apache.hugegraph.loader.mapping.InputStruct;
 +import org.apache.hugegraph.loader.mapping.LoadMapping;
 +import org.apache.hugegraph.loader.mapping.VertexMapping;
 +import org.apache.hugegraph.loader.source.file.Compression;
 +import org.apache.hugegraph.loader.source.file.FileFilter;
 +import org.apache.hugegraph.loader.source.file.FileFormat;
 +import org.apache.hugegraph.loader.source.file.FileSource;
 +import org.apache.hugegraph.loader.source.file.SkippedLine;
 +import org.apache.hugegraph.structure.GraphElement;
 +import org.apache.hugegraph.structure.graph.BatchEdgeRequest;
 +import org.apache.hugegraph.structure.graph.BatchVertexRequest;
 +import org.apache.hugegraph.structure.graph.Edge;
 +import org.apache.hugegraph.structure.graph.UpdateStrategy;
 +import org.apache.hugegraph.structure.graph.Vertex;
 +import org.apache.hugegraph.util.Log;
 +
 +import org.apache.spark.SparkConf;
 +import org.apache.spark.SparkContext;
 +import org.apache.spark.sql.DataFrameReader;
 +import org.apache.spark.sql.Dataset;
 +import org.apache.spark.sql.Row;
 +import org.apache.spark.sql.SparkSession;
 +import org.apache.spark.sql.types.StructField;
 +import org.apache.spark.util.LongAccumulator;
 +import org.slf4j.Logger;
 +
 +import java.io.Serializable;
 +import java.util.ArrayList;
 +import java.util.HashMap;
 +import java.util.Iterator;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Properties;
 +
 +import scala.collection.JavaConverters;
 +
 +public class HugeGraphSparkLoader implements Serializable {
 +
 +    public static final Logger LOG = Log.logger(HugeGraphSparkLoader.class);
 +
 +    private final LoadOptions loadOptions;
 +    private final Map<ElementBuilder, List<GraphElement>> builders;
 +
 +    public static void main(String[] args) {
 +        HugeGraphSparkLoader loader;
 +        try {
 +            loader = new HugeGraphSparkLoader(args);
 +        } catch (Throwable e) {
 +            Printer.printError("Failed to start loading", e);
 +            return;
 +        }
 +        loader.load();
 +    }
 +
 +    public HugeGraphSparkLoader(String[] args) {
 +        this.loadOptions = LoadOptions.parseOptions(args);
 +        this.builders = new HashMap<>();
 +    }
 +
 +    public void load() {
 +        LoadMapping mapping = LoadMapping.of(this.loadOptions.file);
 +        List<InputStruct> structs = mapping.structs();
 +        boolean sinkType = this.loadOptions.sinkType;
-         if(!sinkType){
++        if(!sinkType) {
 +            this.loadOptions.copyBackendStoreInfo(mapping.getBackendStoreInfo());
 +        }
 +        SparkConf conf = new SparkConf()
 +                .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")// kryo序列化
 +                .set("spark.kryo.registrationRequired", "true");
 +        try {
-             conf.registerKryoClasses(
-                     new Class[]
-                     {
-                         org.apache.hadoop.hbase.io.ImmutableBytesWritable.class,
-                         org.apache.hadoop.hbase.KeyValue.class,
-                         org.apache.spark.sql.types.StructType.class,
-                         StructField[].class,
-                         StructField.class,
-                         org.apache.spark.sql.types.LongType$.class,
-                         org.apache.spark.sql.types.Metadata.class,
-                         org.apache.spark.sql.types.StringType$.class,
-                 Class.forName(
-                     "org.apache.spark.internal.io.FileCommitProtocol$TaskCommitMessage"),
-                 Class.forName("scala.reflect.ClassTag$$anon$1"),
-                 Class.forName("scala.collection.immutable.Set$EmptySet$"),
-                 Class.forName("org.apache.spark.sql.types.DoubleType$")
-                     });
++            conf.registerKryoClasses(new Class[] {
++                    org.apache.hadoop.hbase.io.ImmutableBytesWritable.class,
++                    org.apache.hadoop.hbase.KeyValue.class,
++                    org.apache.spark.sql.types.StructType.class,
++                    StructField[].class,
++                    StructField.class,
++                    org.apache.spark.sql.types.LongType$.class,
++                    org.apache.spark.sql.types.Metadata.class,
++                    org.apache.spark.sql.types.StringType$.class,
++                    Class.forName("org.apache.spark.internal.io.FileCommitProtocol$TaskCommitMessage"),
++                    Class.forName("scala.reflect.ClassTag$$anon$1"),
++                    Class.forName("scala.collection.immutable.Set$EmptySet$"),
++                    Class.forName("org.apache.spark.sql.types.DoubleType$")
++                 });
 +        } catch (ClassNotFoundException e) {
 +            LOG.error("spark kryo serialized registration failed");
 +        }
 +        SparkSession session = SparkSession.builder()
 +                                           .config(conf)
 +                                           .getOrCreate();
 +        SparkContext sc = session.sparkContext();
 +
 +        LongAccumulator totalInsertSuccess = sc.longAccumulator("totalInsertSuccess");
 +        for (InputStruct struct : structs) {
 +            LOG.info("\n Initializes the accumulator corresponding to the  {} ",
-                     struct.input().asFileSource().path());
++                     struct.input().asFileSource().path());
 +            LoadDistributeMetrics loadDistributeMetrics = new LoadDistributeMetrics(struct);
 +            loadDistributeMetrics.init(sc);
-             LOG.info("\n   Start to load data, data info is: \t {} ",
-                     struct.input().asFileSource().path());
++            LOG.info("\n  Start to load data, data info is: \t {} ",
++                     struct.input().asFileSource().path());
 +            Dataset<Row> ds = read(session, struct);
 +            if (sinkType) {
 +                LOG.info("\n  Start to load data using spark apis  \n");
 +                ds.foreachPartition((Iterator<Row> p) -> {
 +                    LoadContext context = initPartition(this.loadOptions, struct);
 +                    p.forEachRemaining((Row row) -> {
 +                        loadRow(struct, row, p, context);
 +                    });
 +                    context.close();
 +                });
 +
 +            } else {
 +                LOG.info("\n Start to load data using spark bulkload     \n");
 +                // gen-hfile
 +                HBaseDirectLoader directLoader = new HBaseDirectLoader(loadOptions,
 +                        struct,loadDistributeMetrics);
 +                directLoader.bulkload(ds);
 +
 +            }
 +            collectLoadMetrics(loadDistributeMetrics,totalInsertSuccess);
-             LOG.info("    \n   Finished  load {}  data ",
-                     struct.input().asFileSource().path());
++            LOG.info("\n Finished  load {}  data ",
++                     struct.input().asFileSource().path());
 +        }
 +        Long totalInsertSuccessCnt = totalInsertSuccess.value();
 +        LOG.info("\n ------------The data load task is complete-------------------\n" +
-                 "\n  insertSuccesscnt:\t {}" +
-                 "\n ---------------------------------------------\n"
-                 , totalInsertSuccessCnt);
++                 "\n  insertSuccesscnt:\t {}" +
++                 "\n ---------------------------------------------\n"
++                 , totalInsertSuccessCnt);
 +
 +        sc.stop();
 +        session.close();
 +        session.stop();
 +    }
 +
 +    private void collectLoadMetrics(LoadDistributeMetrics loadMetrics,
 +                                    LongAccumulator totalInsertSuccess) {
 +        Long edgeInsertSuccess = loadMetrics.readEdgeInsertSuccess();
 +        Long vertexInsertSuccess = loadMetrics.readVertexInsertSuccess();
 +        totalInsertSuccess.add(edgeInsertSuccess);
 +        totalInsertSuccess.add(vertexInsertSuccess);
 +    }
 +
 +    private LoadContext initPartition(
 +            LoadOptions loadOptions, InputStruct struct) {
 +        LoadContext context = new LoadContext(loadOptions);
 +        for (VertexMapping vertexMapping : struct.vertices()) {
 +            this.builders.put(new VertexBuilder(context, struct, vertexMapping),
 +                              new ArrayList<>());
 +        }
 +        for (EdgeMapping edgeMapping : struct.edges()) {
 +            this.builders.put(new EdgeBuilder(context, struct, edgeMapping),
 +                              new ArrayList<>());
 +        }
 +        context.updateSchemaCache();
 +        return context;
 +    }
 +
 +    private void loadRow(InputStruct struct, Row row, Iterator<Row> p,
 +                         LoadContext context) {
 +        for (Map.Entry<ElementBuilder, List<GraphElement>> builderMap :
 +                this.builders.entrySet()) {
 +            ElementMapping elementMapping = builderMap.getKey().mapping();
 +            // Parse
 +            if (elementMapping.skip()) {
 +                continue;
 +            }
 +            parse(row, builderMap, struct);
 +
 +            // Insert
 +            List<GraphElement> graphElements = builderMap.getValue();
 +            if (graphElements.size() >= elementMapping.batchSize() ||
 +                (!p.hasNext() && graphElements.size() > 0)) {
 +                flush(builderMap, context.client().graph(), this.loadOptions.checkVertex);
 +            }
 +        }
 +    }
 +
 +    private Dataset<Row> read(SparkSession ss, InputStruct struct) {
 +        InputSource input = struct.input();
 +        String charset = input.charset();
 +        DataFrameReader reader = ss.read();
 +        Dataset<Row> ds;
 +        switch (input.type()) {
 +            case FILE:
 +            case HDFS:
 +                FileSource fileSource = input.asFileSource();
 +                String[] header = fileSource.header();
 +                String delimiter = fileSource.delimiter();
 +                String path = fileSource.path();
 +                FileFilter filter = fileSource.filter();
 +                FileFormat format = fileSource.format();
 +                String dateFormat = fileSource.dateFormat();
 +                String timeZone = fileSource.timeZone();
 +                SkippedLine skippedLine = fileSource.skippedLine();
 +                Compression compression = fileSource.compression();
 +                int batchSize = fileSource.batchSize();
 +                switch (format) {
 +                    case TEXT:
 +                        ds = reader.text(path);
 +                        break;
 +                    case JSON:
 +                        ds = reader.json(path);
 +                        break;
 +                    case CSV:
 +                        ds = reader.csv(path);
 +                        break;
 +                    default:
 +                        throw new IllegalStateException(
 +                                  "Unexpected format value: " + format);
 +                }
 +                break;
 +            case JDBC:
 +                JDBCSource jdbcSource = (JDBCSource) struct.input();
 +                String url = jdbcSource.url() + "/" + jdbcSource.database();
 +                String table = jdbcSource.table();
 +                String username = jdbcSource.username();
 +                String password = jdbcSource.password();
 +                Properties properties = new Properties();
 +                properties.put("user", username);
 +                properties.put("password", password);
 +                ds = reader.jdbc(url, table, properties);
 +                break;
 +            default:
 +                throw new AssertionError(String.format(
 +                          "Unsupported input source '%s'", input.type()));
 +        }
 +        return ds;
 +    }
 +
 +    private void parse(Row row,
 +                       Map.Entry<ElementBuilder, List<GraphElement>> builderMap,
 +                       InputStruct struct) {
 +        ElementBuilder builder = builderMap.getKey();
 +        List<GraphElement> graphElements = builderMap.getValue();
 +        if ("".equals(row.mkString())) {
 +            return;
 +        }
 +        List<GraphElement> elements;
 +        switch (struct.input().type()) {
 +            case FILE:
 +            case HDFS:
 +                FileSource fileSource = struct.input().asFileSource();
 +                String delimiter = fileSource.delimiter();
 +                elements = builder.build(fileSource.header(),
 +                                         row.mkString(delimiter).split(delimiter));
 +                break;
 +            case JDBC:
 +                Object[] structFields = JavaConverters.asJavaCollection(row.schema().toList())
 +                                                      .toArray();
 +                int len = row.schema().length();
 +                String[] headers = new String[len];
 +                Object[] values = new Object[len];
 +                for (int i = 0; i < len; i++) {
 +                    headers[i] = ((StructField) structFields[i]).name();
 +                    values[i] = row.get(i);
 +                }
 +                elements = builder.build(headers, values);
 +                break;
 +            default:
 +                throw new AssertionError(String.format(
 +                          "Unsupported input source '%s'",
 +                          struct.input().type()));
 +        }
 +        graphElements.addAll(elements);
 +    }
 +
 +    private void flush(Map.Entry<ElementBuilder, List<GraphElement>> builderMap,
 +                       GraphManager g, boolean isCheckVertex) {
 +        ElementBuilder builder = builderMap.getKey();
 +        ElementMapping elementMapping = builder.mapping();
 +        List<GraphElement> graphElements = builderMap.getValue();
 +        boolean isVertex = builder.mapping().type().isVertex();
 +        Map<String, UpdateStrategy> updateStrategyMap = elementMapping.updateStrategies();
 +        if (updateStrategyMap.isEmpty()) {
 +            if (isVertex) {
 +                g.addVertices((List<Vertex>) (Object) graphElements);
 +            } else {
 +                g.addEdges((List<Edge>) (Object) graphElements);
 +            }
 +        } else {
 +            // CreateIfNotExist dose not support false now
 +            if (isVertex) {
 +                BatchVertexRequest.Builder req =
 +                        new BatchVertexRequest.Builder();
 +                req.vertices((List<Vertex>) (Object) graphElements)
 +                   .updatingStrategies(updateStrategyMap)
 +                   .createIfNotExist(true);
 +                g.updateVertices(req.build());
 +            } else {
 +                BatchEdgeRequest.Builder req = new BatchEdgeRequest.Builder();
 +                req.edges((List<Edge>) (Object) graphElements)
 +                   .updatingStrategies(updateStrategyMap)
 +                   .checkVertex(isCheckVertex)
 +                   .createIfNotExist(true);
 +                g.updateEdges(req.build());
 +            }
 +        }
 +        graphElements.clear();
 +    }
 +}
diff --cc hugegraph-loader/src/main/java/org/apache/hugegraph/loader/util/MappingUtil.java
index 5bd405e4,00000000..47e7bf06
mode 100644,000000..100644
--- a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/util/MappingUtil.java
+++ b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/util/MappingUtil.java
@@@ -1,188 -1,0 +1,188 @@@
 +/*
 + * Copyright 2017 HugeGraph Authors
 + *
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements. See the NOTICE file distributed with this
 + * work for additional information regarding copyright ownership. The ASF
 + * licenses this file to You under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance with the License.
 + * You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 + * License for the specific language governing permissions and limitations
 + * under the License.
 + */
 +
 +package org.apache.hugegraph.loader.util;
 +
 +import java.io.File;
 +import java.io.IOException;
 +import java.util.ArrayList;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Set;
 +
 +import org.apache.commons.io.FileUtils;
 +
 +import org.apache.hugegraph.loader.constant.Constants;
 +import org.apache.hugegraph.loader.exception.LoadException;
 +import org.apache.hugegraph.loader.mapping.EdgeMapping;
 +import org.apache.hugegraph.loader.mapping.ElementMapping;
 +import org.apache.hugegraph.loader.mapping.InputStruct;
 +import org.apache.hugegraph.loader.mapping.LoadMapping;
 +import org.apache.hugegraph.loader.mapping.VertexMapping;
 +import org.apache.hugegraph.loader.source.InputSource;
 +import org.apache.hugegraph.loader.source.SourceType;
 +import org.apache.hugegraph.loader.source.file.FileSource;
 +import org.apache.hugegraph.loader.struct.EdgeStructV1;
 +import org.apache.hugegraph.loader.struct.ElementStructV1;
 +import org.apache.hugegraph.loader.struct.GraphStructV1;
 +import org.apache.hugegraph.loader.struct.VertexStructV1;
 +import org.apache.hugegraph.util.E;
 +import org.apache.hugegraph.util.InsertionOrderUtil;;
 +import com.google.common.collect.ImmutableSet;
 +
 +@SuppressWarnings("deprecation")
 +public final class MappingUtil {
 +
 +    private static final Set<String> ACCEPTABLE_VERSIONS = ImmutableSet.of(
 +            Constants.V1_STRUCT_VERSION, Constants.V2_STRUCT_VERSION
 +    );
 +
 +    public static void write(LoadMapping mapping, String path) {
 +        File file = FileUtils.getFile(path);
 +        String json = JsonUtil.toJson(mapping);
 +        try {
 +            FileUtils.write(file, json, Constants.CHARSET);
 +        } catch (IOException e) {
 +            throw new LoadException("Failed to write mapping %s to file '%s'",
 +                                    e, mapping, file);
 +        }
 +    }
 +
 +    public static LoadMapping parse(String json) {
 +        Map<String, Object> map = JsonUtil.convertMap(json, String.class,
 +                                                      Object.class);
 +        Object value = map.get(Constants.FIELD_VERSION);
 +        if (value == null) {
 +            value = Constants.V1_STRUCT_VERSION;
 +        } else {
 +            E.checkArgument(value instanceof String,
 +                            "The version value must be String class, " +
 +                            "but got '%s(%s)'", value, value.getClass());
 +        }
 +        String version = (String) value;
 +
 +        E.checkArgument(ACCEPTABLE_VERSIONS.contains(version),
 +                        "Invalid version '%s', the acceptable versions are %s",
 +                        version, ACCEPTABLE_VERSIONS);
 +        if (version.equals(Constants.V2_STRUCT_VERSION)) {
 +            return JsonUtil.fromJson(json, LoadMapping.class);
 +        } else {
 +            assert version.equals(Constants.V1_STRUCT_VERSION);
 +            return parseV1(json);
 +        }
 +    }
 +
 +    private static LoadMapping parseV1(String json) {
 +        GraphStructV1 graphStruct = JsonUtil.fromJson(json,
 +                                                      GraphStructV1.class);
 +        Map<FileSourceKey, InputStruct> fileSourceInputStructs =
 +                                        InsertionOrderUtil.newMap();
 +        List<InputStruct> jdbcSourceInputStructs = new ArrayList<>();
 +        for (ElementStructV1 originStruct : graphStruct.structs()) {
 +            InputSource inputSource = originStruct.input();
 +            ElementMapping targetStruct = convertV1ToV2(originStruct);
 +
 +            SourceType type = inputSource.type();
 +            if (type == SourceType.FILE || type == SourceType.HDFS) {
 +                FileSource source = (FileSource) inputSource;
 +                FileSourceKey key = new FileSourceKey(type, source.path());
 +                fileSourceInputStructs.compute(key, (k, inputStruct) -> {
 +                    if (inputStruct == null) {
 +                        inputStruct = new InputStruct(null, null);
 +                        inputStruct.input(source);
 +                    }
 +                    inputStruct.add(targetStruct);
 +                    return inputStruct;
 +                });
 +            } else {
 +                assert type == SourceType.JDBC;
 +                InputStruct inputStruct = new InputStruct(null, null);
 +                inputStruct.input(inputSource);
 +                inputStruct.add(targetStruct);
 +                jdbcSourceInputStructs.add(inputStruct);
 +            }
 +        }
 +        // Generate id for every input mapping
 +        List<InputStruct> inputStructs = new ArrayList<>();
 +        int id = 0;
 +        for (InputStruct inputStruct : fileSourceInputStructs.values()) {
 +            inputStruct.id(String.valueOf(++id));
 +            inputStructs.add(inputStruct);
 +        }
 +        for (InputStruct inputStruct : jdbcSourceInputStructs) {
 +            inputStruct.id(String.valueOf(++id));
 +            inputStructs.add(inputStruct);
 +        }
-         return new LoadMapping(inputStructs,graphStruct.getBackendStoreInfo());
++        return new LoadMapping(inputStructs, graphStruct.getBackendStoreInfo());
 +    }
 +
 +    private static ElementMapping convertV1ToV2(ElementStructV1 origin) {
 +        ElementMapping target;
 +        if (origin.type().isVertex()) {
 +            VertexStructV1 originVertex = (VertexStructV1) origin;
 +            target = new VertexMapping(originVertex.idField(),
 +                                       originVertex.unfold());
 +        } else {
 +            EdgeStructV1 originEdge = (EdgeStructV1) origin;
 +            target = new EdgeMapping(originEdge.sourceFields(),
 +                                     originEdge.unfoldSource(),
 +                                     originEdge.targetFields(),
 +                                     originEdge.unfoldTarget());
 +        }
 +        fill(origin, target);
 +        return target;
 +    }
 +
 +    private static void fill(ElementStructV1 originStruct,
 +                             ElementMapping targetStruct) {
 +        targetStruct.label(originStruct.label());
 +        targetStruct.skip(originStruct.skip());
 +        targetStruct.mappingFields(originStruct.mappingFields());
 +        targetStruct.mappingValues(originStruct.mappingValues());
 +        targetStruct.selectedFields(originStruct.selectedFields());
 +        targetStruct.ignoredFields(originStruct.ignoredFields());
 +        targetStruct.nullValues(originStruct.nullValues());
 +        targetStruct.updateStrategies(originStruct.updateStrategies());
 +    }
 +
 +    private static class FileSourceKey {
 +
 +        private final SourceType type;
 +        private final String path;
 +
 +        public FileSourceKey(SourceType type, String path) {
 +            this.type = type;
 +            this.path = path;
 +        }
 +
 +        @Override
 +        public boolean equals(Object object) {
 +            if (!(object instanceof FileSourceKey)) {
 +                return false;
 +            }
 +            FileSourceKey other = (FileSourceKey) object;
 +            return this.type == other.type && this.path.equals(other.path);
 +        }
 +
 +        @Override
 +        public int hashCode() {
 +            return this.type.hashCode() ^ this.path.hashCode();
 +        }
 +    }
 +}
diff --cc hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/unit/MappingConverterTest.java
index 7f1dee26,00000000..fc71c74c
mode 100644,000000..100644
--- a/hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/unit/MappingConverterTest.java
+++ b/hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/unit/MappingConverterTest.java
@@@ -1,125 -1,0 +1,139 @@@
 +/*
 + * Copyright 2017 HugeGraph Authors
 + *
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements. See the NOTICE file distributed with this
 + * work for additional information regarding copyright ownership. The ASF
 + * licenses this file to You under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance with the License.
 + * You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 + * License for the specific language governing permissions and limitations
 + * under the License.
 + */
 +
 +package org.apache.hugegraph.loader.test.unit;
 +
 +import java.io.File;
 +import java.io.IOException;
 +import java.nio.charset.Charset;
 +import java.nio.charset.StandardCharsets;
 +
 +import org.apache.commons.io.FileUtils;
 +import org.apache.hugegraph.loader.MappingConverter;
 +import org.junit.Assert;
 +import org.junit.Test;
 +
 +public class MappingConverterTest {
 +
 +    @Test
 +    public void testConvertV1ToV2() throws IOException {
 +        String v1Json = "{" +
 +                "  \"vertices\": [" +
 +                "    {" +
 +                "      \"label\": \"user\"," +
 +                "      \"input\": {" +
 +                "        \"type\": \"file\"," +
 +                "        \"path\": \"users.dat\"," +
 +                "        \"format\": \"TEXT\"," +
 +                "        \"delimiter\": \"::\"," +
 +                "        \"header\": [\"UserID\", \"Gender\", \"Age\", " +
 +                "\"Occupation\", \"Zip-code\"]" +
 +                "      }," +
 +                "      \"ignored\": [\"Gender\", \"Age\", \"Occupation\", " +
 +                "\"Zip-code\"]," +
 +                "      \"field_mapping\": {" +
 +                "        \"UserID\": \"id\"" +
 +                "      }" +
 +                "    }" +
 +                "  ]," +
 +                "  \"edges\": [" +
 +                "    {" +
 +                "      \"label\": \"rating\"," +
 +                "      \"source\": [\"UserID\"]," +
 +                "      \"target\": [\"MovieID\"]," +
 +                "      \"input\": {" +
 +                "        \"type\": \"file\"," +
 +                "        \"path\": \"ratings.dat\"," +
 +                "        \"format\": \"TEXT\"," +
 +                "        \"delimiter\": \"::\"," +
 +                "        \"header\": [\"UserID\", \"MovieID\", \"Rating\", " +
 +                "\"Timestamp\"]" +
 +                "      }," +
 +                "      \"ignored\": [\"Timestamp\"]," +
 +                "      \"field_mapping\": {" +
 +                "        \"UserID\": \"id\"," +
 +                "        \"MovieID\": \"id\"," +
 +                "        \"Rating\": \"rate\"" +
 +                "      }" +
 +                "    }" +
-                 "  ]" +
++                "  ]," +
++                "  \"backendStoreInfo\":" +
++                "  {" +
++                "    \"edge_tablename\": \"hugegraph:g_oe\"," +
++                "    \"vertex_tablename\": \"hugegraph:g_v\"," +
++                "    \"hbase_zookeeper_quorum\": \"127.0.0.1\"," +
++                "    \"hbase_zookeeper_property_clientPort\": \"2181\"," +
++                "    \"zookeeper_znode_parent\": \"/hbase\"" +
++                "  }" +
 +                "}";
 +        String input = "struct.json";
 +        File inputFile = new File(input);
 +        Charset charset = StandardCharsets.UTF_8;
 +        FileUtils.writeStringToFile(inputFile, v1Json, charset);
 +        MappingConverter.main(new String[]{input});
 +
 +        File outputFile = FileUtils.getFile("struct-v2.json");
 +        String actualV2Json = FileUtils.readFileToString(outputFile, charset);
 +        String expectV2Json = "{\"version\":\"2.0\"," +
 +                "\"structs\":[{\"id\":\"1\",\"skip\":false," +
 +                "\"input\":{\"type\":\"FILE\",\"path\":\"users.dat\"," +
 +                "\"file_filter\":{\"extensions\":[\"*\"]}," +
 +                "\"format\":\"TEXT\",\"delimiter\":\"::\"," +
 +                "\"date_format\":\"yyyy-MM-dd HH:mm:ss\"," +
 +                "\"time_zone\":\"GMT+8\",\"skipped_line\":{\"regex\":\"" +
 +                "(^#|^//).*|\"},\"compression\":\"NONE\"," +
 +                "\"batch_size\":500,\"header\":[\"UserID\",\"Gender\"," +
 +                "\"Age\",\"Occupation\",\"Zip-code\"]," +
 +                "\"charset\":\"UTF-8\",\"list_format\":null}," +
 +                "\"vertices\":[{\"label\":\"user\",\"skip\":false," +
 +                "\"id\":null,\"unfold\":false," +
 +                "\"field_mapping\":{\"UserID\":\"id\"}," +
 +                "\"value_mapping\":{},\"selected\":[]," +
 +                "\"ignored\":[\"Occupation\",\"Zip-code\",\"Gender\"," +
 +                "\"Age\"],\"null_values\":[\"\"]," +
 +                "\"update_strategies\":{},\"batch_size\":500}],\"edges\":[]},{\"id\":\"2\"," +
 +                "\"skip\":false,\"input\":{\"type\":\"FILE\"," +
 +                "\"path\":\"ratings.dat\"," +
 +                "\"file_filter\":{\"extensions\":[\"*\"]}," +
 +                "\"format\":\"TEXT\",\"delimiter\":\"::\"," +
 +                "\"date_format\":\"yyyy-MM-dd HH:mm:ss\"," +
 +                "\"time_zone\":\"GMT+8\",\"skipped_line\":{\"regex\":\"" +
 +                "(^#|^//).*|\"},\"compression\":\"NONE\"," +
 +                "\"batch_size\":500,\"header\":[\"UserID\",\"MovieID\"," +
 +                "\"Rating\",\"Timestamp\"],\"charset\":\"UTF-8\"," +
 +                "\"list_format\":null},\"vertices\":[]," +
 +                "\"edges\":[{\"label\":\"rating\",\"skip\":false," +
 +                "\"source\":[\"UserID\"],\"unfold_source\":false," +
 +                "\"target\":[\"MovieID\"],\"unfold_target\":false," +
 +                "\"field_mapping\":{\"UserID\":\"id\",\"MovieID\":\"id\"," +
 +                "\"Rating\":\"rate\"},\"value_mapping\":{},\"selected\":[]," +
 +                "\"ignored\":[\"Timestamp\"],\"null_values\":[\"\"]," +
-                 "\"update_strategies\":{},\"batch_size\":500}]}]}";
++                "\"update_strategies\":{},\"batch_size\":500}]}]," +
++                "\"backendStoreInfo\":{" +
++                "\"edge_tablename\":\"hugegraph:g_oe\"," +
++                "\"vertex_tablename\":\"hugegraph:g_v\"," +
++                "\"hbase_zookeeper_quorum\":\"127.0.0.1\"," +
++                "\"hbase_zookeeper_property_clientPort\":\"2181\"," +
++                "\"zookeeper_znode_parent\":\"/hbase\"}}";
 +        Assert.assertEquals(expectV2Json, actualV2Json);
 +
 +        FileUtils.forceDelete(inputFile);
 +        FileUtils.forceDelete(outputFile);
 +    }
 +}