You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ch...@apache.org on 2017/08/30 06:11:20 UTC

[2/2] hbase git commit: HBASE-15806 An endpoint-based export tool

HBASE-15806 An endpoint-based export tool


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/74659730
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/74659730
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/74659730

Branch: refs/heads/master
Commit: 7465973068f45a78d66394cad7c0858bfeda1b46
Parents: 314d357
Author: Chia-Ping Tsai <ch...@gmail.com>
Authored: Wed Aug 30 14:06:04 2017 +0800
Committer: Chia-Ping Tsai <ch...@gmail.com>
Committed: Wed Aug 30 14:06:04 2017 +0800

----------------------------------------------------------------------
 hbase-endpoint/pom.xml                          |  17 +
 .../apache/hadoop/hbase/coprocessor/Export.java | 538 +++++++++++++++++++
 hbase-endpoint/src/main/protobuf/Export.proto   |  45 ++
 .../hbase/coprocessor/TestImportExport.java     |  56 ++
 .../hbase/coprocessor/TestSecureExport.java     | 439 +++++++++++++++
 .../client/example/ExportEndpointExample.java   |  86 +++
 .../apache/hadoop/hbase/mapreduce/Export.java   | 131 +----
 .../hadoop/hbase/mapreduce/ExportUtils.java     | 175 ++++++
 .../hbase/mapreduce/TestImportExport.java       | 202 ++++---
 .../hbase/security/access/SecureTestUtil.java   |   2 +-
 10 files changed, 1513 insertions(+), 178 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/74659730/hbase-endpoint/pom.xml
----------------------------------------------------------------------
diff --git a/hbase-endpoint/pom.xml b/hbase-endpoint/pom.xml
index 29bd33b..e37272f 100644
--- a/hbase-endpoint/pom.xml
+++ b/hbase-endpoint/pom.xml
@@ -166,6 +166,23 @@
       <type>test-jar</type>
       <scope>test</scope>
     </dependency>
+    <!-- The coprocessor.Export needs mapreduce.Import and mapreduce.Export to run the unit tests -->
+    <!-- see org.apache.hadoop.hbase.coprocessor.TestImportExport -->
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-mapreduce</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-mapreduce</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-minikdc</artifactId>
+      <scope>test</scope>
+    </dependency>
     <!-- General dependencies -->
     <dependency>
       <groupId>commons-logging</groupId>

http://git-wip-us.apache.org/repos/asf/hbase/blob/74659730/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/coprocessor/Export.java
----------------------------------------------------------------------
diff --git a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/coprocessor/Export.java b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/coprocessor/Export.java
new file mode 100644
index 0000000..7a7cc00
--- /dev/null
+++ b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/coprocessor/Export.java
@@ -0,0 +1,538 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor;
+
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.Service;
+import java.io.Closeable;
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.Coprocessor;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
+import org.apache.hadoop.hbase.ipc.RpcServer;
+import org.apache.hadoop.hbase.ipc.ServerRpcController;
+import org.apache.hadoop.hbase.mapreduce.ExportUtils;
+import org.apache.hadoop.hbase.mapreduce.Import;
+import org.apache.hadoop.hbase.mapreduce.ResultSerialization;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.DelegationToken;
+import org.apache.hadoop.hbase.protobuf.generated.ExportProtos;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.security.token.FsDelegationToken;
+import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hbase.util.ArrayUtils;
+import org.apache.hadoop.hbase.util.ByteStringer;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Triple;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * Export an HBase table. Writes content to sequence files up in HDFS. Use
+ * {@link Import} to read it back in again. It is implemented by the endpoint
+ * technique.
+ *
+ * @see org.apache.hadoop.hbase.mapreduce.Export
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
+@InterfaceStability.Evolving
+public class Export extends ExportProtos.ExportService
+        implements Coprocessor, CoprocessorService {
+
+  private static final Log LOG = LogFactory.getLog(Export.class);
+  private static final Class<? extends CompressionCodec> DEFAULT_CODEC = DefaultCodec.class;
+  private static final SequenceFile.CompressionType DEFAULT_TYPE = SequenceFile.CompressionType.RECORD;
+  private RegionCoprocessorEnvironment env = null;
+  private UserProvider userProvider;
+
+  public static void main(String[] args) throws Throwable {
+    Map<byte[], Response> response = run(HBaseConfiguration.create(), args);
+    System.exit(response == null ? -1 : 0);
+  }
+
+  @VisibleForTesting
+  static Map<byte[], Response> run(final Configuration conf, final String[] args) throws Throwable {
+    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
+    if (!ExportUtils.isValidArguements(args)) {
+      ExportUtils.usage("Wrong number of arguments: " + ArrayUtils.length(otherArgs));
+      return null;
+    }
+    Triple<TableName, Scan, Path> arguments = ExportUtils.getArgumentsFromCommandLine(conf, otherArgs);
+    return run(conf, arguments.getFirst(), arguments.getSecond(), arguments.getThird());
+  }
+
+  public static Map<byte[], Response> run(final Configuration conf, TableName tableName, Scan scan, Path dir) throws Throwable {
+    FileSystem fs = dir.getFileSystem(conf);
+    UserProvider userProvider = UserProvider.instantiate(conf);
+    checkDir(fs, dir);
+    FsDelegationToken fsDelegationToken = new FsDelegationToken(userProvider, "renewer");
+    fsDelegationToken.acquireDelegationToken(fs);
+    try {
+      final ExportProtos.ExportRequest request = getConfiguredRequest(conf, dir,
+        scan, fsDelegationToken.getUserToken());
+      try (Connection con = ConnectionFactory.createConnection(conf);
+              Table table = con.getTable(tableName)) {
+        Map<byte[], Response> result = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+        table.coprocessorService(ExportProtos.ExportService.class,
+          scan.getStartRow(),
+          scan.getStopRow(),
+          (ExportProtos.ExportService service) -> {
+            ServerRpcController controller = new ServerRpcController();
+            Map<byte[], ExportProtos.ExportResponse> rval = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+            CoprocessorRpcUtils.BlockingRpcCallback<ExportProtos.ExportResponse>
+              rpcCallback = new CoprocessorRpcUtils.BlockingRpcCallback<>();
+            service.export(controller, request, rpcCallback);
+            if (controller.failedOnException()) {
+              throw controller.getFailedOn();
+            }
+            return rpcCallback.get();
+          }).forEach((k, v) -> result.put(k, new Response(v)));
+        return result;
+      } catch (Throwable e) {
+        fs.delete(dir, true);
+        throw e;
+      }
+    } finally {
+      fsDelegationToken.releaseDelegationToken();
+    }
+  }
+
+  private static boolean getCompression(final ExportProtos.ExportRequest request) {
+    if (request.hasCompressed()) {
+      return request.getCompressed();
+    } else {
+      return false;
+    }
+  }
+
+  private static SequenceFile.CompressionType getCompressionType(final ExportProtos.ExportRequest request) {
+    if (request.hasCompressType()) {
+      return SequenceFile.CompressionType.valueOf(request.getCompressType());
+    } else {
+      return DEFAULT_TYPE;
+    }
+  }
+
+  private static CompressionCodec getCompressionCodec(final Configuration conf, final ExportProtos.ExportRequest request) {
+    try {
+      Class<? extends CompressionCodec> codecClass;
+      if (request.hasCompressCodec()) {
+        codecClass = conf.getClassByName(request.getCompressCodec()).asSubclass(CompressionCodec.class);
+      } else {
+        codecClass = DEFAULT_CODEC;
+      }
+      return ReflectionUtils.newInstance(codecClass, conf);
+    } catch (ClassNotFoundException e) {
+      throw new IllegalArgumentException("Compression codec "
+              + request.getCompressCodec() + " was not found.", e);
+    }
+  }
+
+  private static SequenceFile.Writer.Option getOutputPath(final Configuration conf,
+          final HRegionInfo info, final ExportProtos.ExportRequest request) throws IOException {
+    Path file = new Path(request.getOutputPath(), "export-" + info.getEncodedName());
+    FileSystem fs = file.getFileSystem(conf);
+    if (fs.exists(file)) {
+      throw new IOException(file + " exists");
+    }
+    return SequenceFile.Writer.file(file);
+  }
+
+  private static List<SequenceFile.Writer.Option> getWriterOptions(final Configuration conf,
+          final HRegionInfo info, final ExportProtos.ExportRequest request) throws IOException {
+    List<SequenceFile.Writer.Option> rval = new LinkedList<>();
+    rval.add(SequenceFile.Writer.keyClass(ImmutableBytesWritable.class));
+    rval.add(SequenceFile.Writer.valueClass(Result.class));
+    rval.add(getOutputPath(conf, info, request));
+    if (getCompression(request)) {
+      rval.add(SequenceFile.Writer.compression(getCompressionType(request), getCompressionCodec(conf, request)));
+    } else {
+      rval.add(SequenceFile.Writer.compression(SequenceFile.CompressionType.NONE));
+    }
+    return rval;
+  }
+
+  private static ExportProtos.ExportResponse processData(final Region region, final Configuration conf,
+    final UserProvider userProvider, final Scan scan, final Token userToken,
+    final List<SequenceFile.Writer.Option> opts) throws IOException {
+    ScanCoprocessor cp = new ScanCoprocessor(region);
+    RegionScanner scanner = null;
+    try (RegionOp regionOp = new RegionOp(region);
+            SecureWriter out = new SecureWriter(conf, userProvider, userToken, opts)) {
+      scanner = cp.checkScannerOpen(scan);
+      ImmutableBytesWritable key = new ImmutableBytesWritable();
+      long rowCount = 0;
+      long cellCount = 0;
+      List<Result> results = new ArrayList<>();
+      List<Cell> cells = new ArrayList<>();
+      boolean hasMore;
+      do {
+        boolean bypass = cp.preScannerNext(scanner, results, scan.getBatch());
+        if (bypass) {
+          hasMore = false;
+        } else {
+          hasMore = scanner.nextRaw(cells);
+          if (cells.isEmpty()) {
+            continue;
+          }
+          Cell firstCell = cells.get(0);
+          for (Cell cell : cells) {
+            if (Bytes.compareTo(firstCell.getRowArray(), firstCell.getRowOffset(), firstCell.getRowLength(),
+                    cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()) != 0) {
+              throw new IOException("Why the RegionScanner#nextRaw returns the data of different rows??"
+                      + " first row=" + Bytes.toHex(firstCell.getRowArray(), firstCell.getRowOffset(), firstCell.getRowLength())
+                      + ", current row=" + Bytes.toHex(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()));
+            }
+          }
+          results.add(Result.create(cells));
+          cells.clear();
+          cp.postScannerNext(scanner, results, scan.getBatch(), hasMore);
+        }
+        for (Result r : results) {
+          key.set(r.getRow());
+          out.append(key, r);
+          ++rowCount;
+          cellCount += r.size();
+        }
+        results.clear();
+      } while (hasMore);
+      return ExportProtos.ExportResponse.newBuilder()
+              .setRowCount(rowCount)
+              .setCellCount(cellCount)
+              .build();
+    } finally {
+      cp.checkScannerClose(scanner);
+    }
+  }
+
+  private static void checkDir(final FileSystem fs, final Path dir) throws IOException {
+    if (fs.exists(dir)) {
+      throw new RuntimeException("The " + dir + " exists");
+    }
+    if (!fs.mkdirs(dir)) {
+      throw new IOException("Failed to create the " + dir);
+    }
+  }
+
+  private static ExportProtos.ExportRequest getConfiguredRequest(Configuration conf,
+          Path dir, final Scan scan, final Token<?> userToken) throws IOException {
+    boolean compressed = conf.getBoolean(FileOutputFormat.COMPRESS, false);
+    String compressionType = conf.get(FileOutputFormat.COMPRESS_TYPE,
+            DEFAULT_TYPE.toString());
+    String compressionCodec = conf.get(FileOutputFormat.COMPRESS_CODEC,
+            DEFAULT_CODEC.getName());
+    DelegationToken protoToken = null;
+    if (userToken != null) {
+      protoToken = DelegationToken.newBuilder()
+              .setIdentifier(ByteStringer.wrap(userToken.getIdentifier()))
+              .setPassword(ByteStringer.wrap(userToken.getPassword()))
+              .setKind(userToken.getKind().toString())
+              .setService(userToken.getService().toString()).build();
+    }
+    LOG.info("compressed=" + compressed
+            + ", compression type=" + compressionType
+            + ", compression codec=" + compressionCodec
+            + ", userToken=" + userToken);
+    ExportProtos.ExportRequest.Builder builder = ExportProtos.ExportRequest.newBuilder()
+            .setScan(ProtobufUtil.toScan(scan))
+            .setOutputPath(dir.toString())
+            .setCompressed(compressed)
+            .setCompressCodec(compressionCodec)
+            .setCompressType(compressionType);
+    if (protoToken != null) {
+      builder.setFsToken(protoToken);
+    }
+    return builder.build();
+  }
+
+
+  @Override
+  public void start(CoprocessorEnvironment environment) throws IOException {
+    if (environment instanceof RegionCoprocessorEnvironment) {
+      env = (RegionCoprocessorEnvironment) environment;
+      userProvider = UserProvider.instantiate(env.getConfiguration());
+    } else {
+      throw new CoprocessorException("Must be loaded on a table region!");
+    }
+  }
+
+  @Override
+  public void stop(CoprocessorEnvironment env) throws IOException {
+  }
+
+  @Override
+  public Service getService() {
+    return this;
+  }
+
+  @Override
+  public void export(RpcController controller, ExportProtos.ExportRequest request,
+          RpcCallback<ExportProtos.ExportResponse> done) {
+    Region region = env.getRegion();
+    Configuration conf = HBaseConfiguration.create(env.getConfiguration());
+    conf.setStrings("io.serializations", conf.get("io.serializations"), ResultSerialization.class.getName());
+    try {
+      Scan scan = validateKey(region.getRegionInfo(), request);
+      Token userToken = null;
+      if (userProvider.isHadoopSecurityEnabled() && !request.hasFsToken()) {
+        LOG.warn("Hadoop security is enable, but no found of user token");
+      } else if (userProvider.isHadoopSecurityEnabled()) {
+        userToken = new Token(request.getFsToken().getIdentifier().toByteArray(),
+                request.getFsToken().getPassword().toByteArray(),
+                new Text(request.getFsToken().getKind()),
+                new Text(request.getFsToken().getService()));
+      }
+      ExportProtos.ExportResponse response = processData(region, conf, userProvider,
+        scan, userToken, getWriterOptions(conf, region.getRegionInfo(), request));
+      done.run(response);
+    } catch (IOException e) {
+      CoprocessorRpcUtils.setControllerException(controller, e);
+      LOG.error(e);
+    }
+  }
+
+  private Scan validateKey(final HRegionInfo region, final ExportProtos.ExportRequest request) throws IOException {
+    Scan scan = ProtobufUtil.toScan(request.getScan());
+    byte[] regionStartKey = region.getStartKey();
+    byte[] originStartKey = scan.getStartRow();
+    if (originStartKey == null
+            || Bytes.compareTo(originStartKey, regionStartKey) < 0) {
+      scan.setStartRow(regionStartKey);
+    }
+    byte[] regionEndKey = region.getEndKey();
+    byte[] originEndKey = scan.getStopRow();
+    if (originEndKey == null
+            || Bytes.compareTo(originEndKey, regionEndKey) > 0) {
+      scan.setStartRow(regionEndKey);
+    }
+    return scan;
+  }
+
+  private static class RegionOp implements Closeable {
+
+    private final Region region;
+
+    RegionOp(final Region region) throws IOException {
+      this.region = region;
+      region.startRegionOperation();
+    }
+
+    @Override
+    public void close() throws IOException {
+      region.closeRegionOperation();
+    }
+  }
+
+  private static class ScanCoprocessor {
+
+    private final Region region;
+
+    ScanCoprocessor(final Region region) {
+      this.region = region;
+    }
+
+    RegionScanner checkScannerOpen(final Scan scan) throws IOException {
+      RegionScanner scanner;
+      if (region.getCoprocessorHost() == null) {
+        scanner = region.getScanner(scan);
+      } else {
+        scanner = region.getCoprocessorHost().preScannerOpen(scan);
+        if (scanner == null) {
+          scanner = region.getScanner(scan);
+        }
+        scanner = region.getCoprocessorHost().postScannerOpen(scan, scanner);
+      }
+      if (scanner == null) {
+        throw new IOException("Failed to open region scanner");
+      }
+      return scanner;
+    }
+
+    void checkScannerClose(final InternalScanner s) throws IOException {
+      if (s == null) {
+        return;
+      }
+      if (region.getCoprocessorHost() == null) {
+        s.close();
+        return;
+      }
+      if (region.getCoprocessorHost().preScannerClose(s)) {
+        return;
+      }
+      try {
+        s.close();
+      } finally {
+        region.getCoprocessorHost().postScannerClose(s);
+      }
+    }
+
+    boolean preScannerNext(final InternalScanner s,
+            final List<Result> results, final int limit) throws IOException {
+      if (region.getCoprocessorHost() == null) {
+        return false;
+      } else {
+        Boolean bypass = region.getCoprocessorHost().preScannerNext(s, results, limit);
+        return bypass == null ? false : bypass;
+      }
+    }
+
+    boolean postScannerNext(final InternalScanner s,
+            final List<Result> results, final int limit, boolean hasMore)
+            throws IOException {
+      if (region.getCoprocessorHost() == null) {
+        return false;
+      } else {
+        return region.getCoprocessorHost().postScannerNext(s, results, limit, hasMore);
+      }
+    }
+  }
+
+  private static class SecureWriter implements Closeable {
+    private final PrivilegedWriter privilegedWriter;
+
+    SecureWriter(final Configuration conf, final UserProvider userProvider, final Token userToken,
+            final List<SequenceFile.Writer.Option> opts) throws IOException {
+      privilegedWriter = new PrivilegedWriter(getActiveUser(userProvider, userToken),
+        SequenceFile.createWriter(conf, opts.toArray(new SequenceFile.Writer.Option[opts.size()])));
+    }
+
+    void append(final Object key, final Object value) throws IOException {
+      privilegedWriter.append(key, value);
+    }
+
+    private static User getActiveUser(final UserProvider userProvider, final Token userToken) throws IOException {
+      User user = RpcServer.getRequestUser();
+      if (user == null) {
+        user = userProvider.getCurrent();
+      }
+      if (user == null && userToken != null) {
+        LOG.warn("No found of user credentials, but a token was got from user request");
+      } else if (user != null && userToken != null) {
+        user.addToken(userToken);
+      }
+      return user;
+    }
+
+    @Override
+    public void close() throws IOException {
+      privilegedWriter.close();
+    }
+  }
+
+  private static class PrivilegedWriter implements PrivilegedExceptionAction<Boolean>, Closeable {
+    private final User user;
+    private final SequenceFile.Writer out;
+    private Object key;
+    private Object value;
+
+    PrivilegedWriter(final User user, final SequenceFile.Writer out) {
+      this.user = user;
+      this.out = out;
+    }
+
+    void append(final Object key, final Object value) throws IOException {
+      if (user == null) {
+        out.append(key, value);
+      } else {
+        this.key = key;
+        this.value = value;
+        try {
+          user.runAs(this);
+        } catch (InterruptedException ex) {
+          throw new IOException(ex);
+        }
+      }
+    }
+
+    @Override
+    public Boolean run() throws Exception {
+      out.append(key, value);
+      return true;
+    }
+
+    @Override
+    public void close() throws IOException {
+      out.close();
+    }
+  }
+
+  public static class Response {
+
+    private final long rowCount;
+    private final long cellCount;
+
+    private Response(ExportProtos.ExportResponse r) {
+      this.rowCount = r.getRowCount();
+      this.cellCount = r.getCellCount();
+    }
+
+    public long getRowCount() {
+      return rowCount;
+    }
+
+    public long getCellCount() {
+      return cellCount;
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder builder = new StringBuilder(35);
+      return builder.append("rowCount=")
+             .append(rowCount)
+             .append(", cellCount=")
+             .append(cellCount)
+             .toString();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/74659730/hbase-endpoint/src/main/protobuf/Export.proto
----------------------------------------------------------------------
diff --git a/hbase-endpoint/src/main/protobuf/Export.proto b/hbase-endpoint/src/main/protobuf/Export.proto
new file mode 100644
index 0000000..5e6c262
--- /dev/null
+++ b/hbase-endpoint/src/main/protobuf/Export.proto
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package hbase.pb;
+
+option java_package = "org.apache.hadoop.hbase.protobuf.generated";
+option java_outer_classname = "ExportProtos";
+option java_generate_equals_and_hash = true;
+option optimize_for = SPEED;
+option java_generic_services = true;
+
+import "Client.proto";
+
+service ExportService {
+  rpc export (ExportRequest) returns (ExportResponse);
+}
+
+message ExportRequest {
+  required Scan scan = 1;
+  required string outputPath = 2;
+  optional bool compressed = 3 [default = false];
+  optional string compressType = 4;
+  optional string compressCodec = 5;
+  optional DelegationToken fsToken = 6;
+}
+message ExportResponse {
+  required uint64 rowCount = 1;
+  required uint64 cellCount = 2;
+}
+

http://git-wip-us.apache.org/repos/asf/hbase/blob/74659730/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestImportExport.java
----------------------------------------------------------------------
diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestImportExport.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestImportExport.java
new file mode 100644
index 0000000..e0d4fd2
--- /dev/null
+++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestImportExport.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.coprocessor;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({MediumTests.class})
+public class TestImportExport extends org.apache.hadoop.hbase.mapreduce.TestImportExport {
+
+  @BeforeClass
+  public static void beforeClass() throws Throwable {
+    UTIL.getConfiguration().setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
+      org.apache.hadoop.hbase.coprocessor.Export.class.getName());
+    org.apache.hadoop.hbase.mapreduce.TestImportExport.beforeClass();
+  }
+
+  @Override
+  protected boolean runExport(String[] args) throws Throwable {
+    Export.run(new Configuration(UTIL.getConfiguration()), args);
+    return true;
+  }
+
+  @Override
+  protected void runExportMain(String[] args) throws Throwable {
+    Export.main(args);
+  }
+
+  /**
+   * Skip the test which is unrelated to the coprocessor.Export.
+   */
+  @Test
+  @Ignore
+  public void testImport94Table() throws Throwable {
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/74659730/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestSecureExport.java
----------------------------------------------------------------------
diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestSecureExport.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestSecureExport.java
new file mode 100644
index 0000000..66d99dd
--- /dev/null
+++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestSecureExport.java
@@ -0,0 +1,439 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.coprocessor;
+
+import com.google.protobuf.ServiceException;
+import java.io.File;
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.http.ssl.KeyStoreTestUtil;
+import org.apache.hadoop.hbase.mapreduce.ExportUtils;
+import org.apache.hadoop.hbase.mapreduce.HadoopSecurityEnabledUserProviderForTesting;
+import org.apache.hadoop.hbase.mapreduce.Import;
+import org.apache.hadoop.hbase.protobuf.generated.VisibilityLabelsProtos;
+import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.security.access.AccessControlConstants;
+import org.apache.hadoop.hbase.security.access.AccessControlLists;
+import org.apache.hadoop.hbase.security.access.SecureTestUtil;
+import org.apache.hadoop.hbase.security.access.SecureTestUtil.AccessTestAction;
+import org.apache.hadoop.hbase.security.access.Permission;
+import org.apache.hadoop.hbase.security.visibility.Authorizations;
+import org.apache.hadoop.hbase.security.visibility.CellVisibility;
+import org.apache.hadoop.hbase.security.visibility.VisibilityClient;
+import org.apache.hadoop.hbase.security.visibility.VisibilityConstants;
+import org.apache.hadoop.hbase.security.visibility.VisibilityTestUtil;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.http.HttpConfig;
+import org.apache.hadoop.minikdc.MiniKdc;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.junit.After;
+import org.junit.AfterClass;
+import static org.junit.Assert.assertEquals;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+@Category({MediumTests.class})
+public class TestSecureExport {
+  private static final Log LOG = LogFactory.getLog(TestSecureExport.class);
+  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+  private static MiniKdc KDC;
+  private static final File KEYTAB_FILE = new File(UTIL.getDataTestDir("keytab").toUri().getPath());
+  private static String USERNAME;
+  private static String SERVER_PRINCIPAL;
+  private static String HTTP_PRINCIPAL;
+  private static final String FAMILYA_STRING = "fma";
+  private static final String FAMILYB_STRING = "fma";
+  private static final byte[] FAMILYA = Bytes.toBytes(FAMILYA_STRING);
+  private static final byte[] FAMILYB = Bytes.toBytes(FAMILYB_STRING);
+  private static final byte[] ROW1 = Bytes.toBytes("row1");
+  private static final byte[] ROW2 = Bytes.toBytes("row2");
+  private static final byte[] ROW3 = Bytes.toBytes("row3");
+  private static final byte[] QUAL = Bytes.toBytes("qual");
+  private static final String LOCALHOST = "localhost";
+  private static final long NOW = System.currentTimeMillis();
+  // user granted with all global permission
+  private static final String USER_ADMIN = "admin";
+  // user is table owner. will have all permissions on table
+  private static final String USER_OWNER = "owner";
+  // user with rx permissions.
+  private static final String USER_RX = "rxuser";
+  // user with exe-only permissions.
+  private static final String USER_XO = "xouser";
+  // user with read-only permissions.
+  private static final String USER_RO = "rouser";
+  // user with no permissions
+  private static final String USER_NONE = "noneuser";
+  private static final String PRIVATE = "private";
+  private static final String CONFIDENTIAL = "confidential";
+  private static final String SECRET = "secret";
+  private static final String TOPSECRET = "topsecret";
+  @Rule
+  public final TestName name = new TestName();
+  private static void setUpKdcServer() throws Exception {
+    Properties conf = MiniKdc.createConf();
+    conf.put(MiniKdc.DEBUG, true);
+    File kdcFile = new File(UTIL.getDataTestDir("kdc").toUri().getPath());
+    KDC = new MiniKdc(conf, kdcFile);
+    KDC.start();
+    USERNAME = UserGroupInformation.getLoginUser().getShortUserName();
+    SERVER_PRINCIPAL = USERNAME + "/" + LOCALHOST;
+    HTTP_PRINCIPAL = "HTTP/" + LOCALHOST;
+    KDC.createPrincipal(KEYTAB_FILE,
+      SERVER_PRINCIPAL,
+      HTTP_PRINCIPAL,
+      USER_ADMIN + "/" + LOCALHOST,
+      USER_OWNER + "/" + LOCALHOST,
+      USER_RX + "/" + LOCALHOST,
+      USER_RO + "/" + LOCALHOST,
+      USER_XO + "/" + LOCALHOST,
+      USER_NONE + "/" + LOCALHOST);
+  }
+  private static User getUserByLogin(final String user) throws IOException {
+    return User.create(UserGroupInformation.loginUserFromKeytabAndReturnUGI(getPrinciple(user), KEYTAB_FILE.getAbsolutePath()));
+  }
+  private static String getPrinciple(final String user) {
+    return user + "/" + LOCALHOST + "@" + KDC.getRealm();
+  }
+  private static void setUpClusterKdc() throws Exception {
+    HBaseKerberosUtils.setKeytabFileForTesting(KEYTAB_FILE.getAbsolutePath());
+    HBaseKerberosUtils.setPrincipalForTesting(SERVER_PRINCIPAL + "@" + KDC.getRealm());
+    HBaseKerberosUtils.setSecuredConfiguration(UTIL.getConfiguration());
+    // if we drop support for hadoop-2.4.0 and hadoop-2.4.1,
+    // the following key should be changed.
+    // 1) DFS_NAMENODE_USER_NAME_KEY -> DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY
+    // 2) DFS_DATANODE_USER_NAME_KEY -> DFS_DATANODE_KERBEROS_PRINCIPAL_KEY
+    UTIL.getConfiguration().set(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY, SERVER_PRINCIPAL + "@" + KDC.getRealm());
+    UTIL.getConfiguration().set(DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY, SERVER_PRINCIPAL + "@" + KDC.getRealm());
+    UTIL.getConfiguration().set(DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY, KEYTAB_FILE.getAbsolutePath());
+    UTIL.getConfiguration().set(DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY, KEYTAB_FILE.getAbsolutePath());
+    // set yarn principal
+    UTIL.getConfiguration().set(YarnConfiguration.RM_PRINCIPAL, SERVER_PRINCIPAL + "@" + KDC.getRealm());
+    UTIL.getConfiguration().set(YarnConfiguration.NM_PRINCIPAL, SERVER_PRINCIPAL + "@" + KDC.getRealm());
+    UTIL.getConfiguration().set(DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY, HTTP_PRINCIPAL + "@" + KDC.getRealm());
+    UTIL.getConfiguration().setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
+    UTIL.getConfiguration().set(DFSConfigKeys.DFS_HTTP_POLICY_KEY, HttpConfig.Policy.HTTPS_ONLY.name());
+    UTIL.getConfiguration().set(DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY, LOCALHOST + ":0");
+    UTIL.getConfiguration().set(DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY, LOCALHOST + ":0");
+
+    File keystoresDir = new File(UTIL.getDataTestDir("keystore").toUri().getPath());
+    keystoresDir.mkdirs();
+    String sslConfDir = KeyStoreTestUtil.getClasspathDir(TestSecureExport.class);
+    KeyStoreTestUtil.setupSSLConfig(keystoresDir.getAbsolutePath(), sslConfDir, UTIL.getConfiguration(), false);
+
+    UTIL.getConfiguration().setBoolean("ignore.secure.ports.for.testing", true);
+    UserGroupInformation.setConfiguration(UTIL.getConfiguration());
+    UTIL.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, UTIL.getConfiguration().get(
+      CoprocessorHost.REGION_COPROCESSOR_CONF_KEY) + "," + Export.class.getName());
+  }
+  private static void addLabels(final Configuration conf, final List<String> users, final List<String> labels) throws Exception {
+    PrivilegedExceptionAction<VisibilityLabelsProtos.VisibilityLabelsResponse> action
+      = () -> {
+        try (Connection conn = ConnectionFactory.createConnection(conf)) {
+          VisibilityClient.addLabels(conn, labels.toArray(new String[labels.size()]));
+          for (String user : users) {
+            VisibilityClient.setAuths(conn, labels.toArray(new String[labels.size()]), user);
+          }
+        } catch (Throwable t) {
+          throw new IOException(t);
+        }
+        return null;
+      };
+    getUserByLogin(USER_ADMIN).runAs(action);
+  }
+
+  @Before
+  public void announce() {
+    LOG.info("Running " + name.getMethodName());
+  }
+
+  @After
+  public void cleanup() throws IOException {
+  }
+  private static void clearOutput(Path path) throws IOException {
+    FileSystem fs = path.getFileSystem(UTIL.getConfiguration());
+    if (fs.exists(path)) {
+      assertEquals(true, fs.delete(path, true));
+    }
+  }
+  /**
+   * Sets the security firstly for getting the correct default realm.
+   * @throws Exception
+   */
+  @BeforeClass
+  public static void beforeClass() throws Exception {
+    UserProvider.setUserProviderForTesting(UTIL.getConfiguration(), HadoopSecurityEnabledUserProviderForTesting.class);
+    setUpKdcServer();
+    SecureTestUtil.enableSecurity(UTIL.getConfiguration());
+    UTIL.getConfiguration().setBoolean(AccessControlConstants.EXEC_PERMISSION_CHECKS_KEY, true);
+    VisibilityTestUtil.enableVisiblityLabels(UTIL.getConfiguration());
+    SecureTestUtil.verifyConfiguration(UTIL.getConfiguration());
+    setUpClusterKdc();
+    UTIL.startMiniCluster();
+    UTIL.waitUntilAllRegionsAssigned(AccessControlLists.ACL_TABLE_NAME);
+    UTIL.waitUntilAllRegionsAssigned(VisibilityConstants.LABELS_TABLE_NAME);
+    UTIL.waitTableEnabled(AccessControlLists.ACL_TABLE_NAME, 50000);
+    UTIL.waitTableEnabled(VisibilityConstants.LABELS_TABLE_NAME, 50000);
+    SecureTestUtil.grantGlobal(UTIL, USER_ADMIN,
+            Permission.Action.ADMIN,
+            Permission.Action.CREATE,
+            Permission.Action.EXEC,
+            Permission.Action.READ,
+            Permission.Action.WRITE);
+    addLabels(UTIL.getConfiguration(), Arrays.asList(USER_OWNER),
+            Arrays.asList(PRIVATE, CONFIDENTIAL, SECRET, TOPSECRET));
+  }
+
+  @AfterClass
+  public static void afterClass() throws Exception {
+    if (KDC != null) {
+      KDC.stop();
+    }
+    UTIL.shutdownMiniCluster();
+  }
+
+  /**
+   * Test the ExportEndpoint's access levels. The {@link Export} test is ignored
+   * since the access exceptions cannot be collected from the mappers.
+   *
+   * @throws java.io.IOException
+   */
+  @Test
+  public void testAccessCase() throws IOException, Throwable {
+    final String exportTable = name.getMethodName();
+    TableDescriptor exportHtd = TableDescriptorBuilder
+            .newBuilder(TableName.valueOf(name.getMethodName()))
+            .addColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILYA))
+            .setOwnerString(USER_OWNER)
+            .build();
+    SecureTestUtil.createTable(UTIL, exportHtd, new byte[][]{Bytes.toBytes("s")});
+    SecureTestUtil.grantOnTable(UTIL, USER_RO,
+            TableName.valueOf(exportTable), null, null,
+            Permission.Action.READ);
+    SecureTestUtil.grantOnTable(UTIL, USER_RX,
+            TableName.valueOf(exportTable), null, null,
+            Permission.Action.READ,
+            Permission.Action.EXEC);
+    SecureTestUtil.grantOnTable(UTIL, USER_XO,
+            TableName.valueOf(exportTable), null, null,
+            Permission.Action.EXEC);
+    assertEquals(4, AccessControlLists.getTablePermissions(UTIL.getConfiguration(),
+            TableName.valueOf(exportTable)).size());
+    AccessTestAction putAction = () -> {
+      Put p = new Put(ROW1);
+      p.addColumn(FAMILYA, Bytes.toBytes("qual_0"), NOW, QUAL);
+      p.addColumn(FAMILYA, Bytes.toBytes("qual_1"), NOW, QUAL);
+      try (Connection conn = ConnectionFactory.createConnection(UTIL.getConfiguration());
+              Table t = conn.getTable(TableName.valueOf(exportTable))) {
+        t.put(p);
+      }
+      return null;
+    };
+    // no hdfs access.
+    SecureTestUtil.verifyAllowed(putAction,
+      getUserByLogin(USER_ADMIN),
+      getUserByLogin(USER_OWNER));
+    SecureTestUtil.verifyDenied(putAction,
+      getUserByLogin(USER_RO),
+      getUserByLogin(USER_XO),
+      getUserByLogin(USER_RX),
+      getUserByLogin(USER_NONE));
+
+    final FileSystem fs = UTIL.getDFSCluster().getFileSystem();
+    final Path openDir = fs.makeQualified(new Path("testAccessCase"));
+    fs.mkdirs(openDir);
+    fs.setPermission(openDir, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
+    final Path output = fs.makeQualified(new Path(openDir, "output"));
+    AccessTestAction exportAction = () -> {
+      try {
+        String[] args = new String[]{exportTable, output.toString()};
+        Map<byte[], Export.Response> result
+                = Export.run(new Configuration(UTIL.getConfiguration()), args);
+        long rowCount = 0;
+        long cellCount = 0;
+        for (Export.Response r : result.values()) {
+          rowCount += r.getRowCount();
+          cellCount += r.getCellCount();
+        }
+        assertEquals(1, rowCount);
+        assertEquals(2, cellCount);
+        return null;
+      } catch (ServiceException | IOException ex) {
+        throw ex;
+      } catch (Throwable ex) {
+        LOG.error(ex);
+        throw new Exception(ex);
+      } finally {
+        clearOutput(output);
+      }
+    };
+    SecureTestUtil.verifyDenied(exportAction,
+      getUserByLogin(USER_RO),
+      getUserByLogin(USER_XO),
+      getUserByLogin(USER_NONE));
+    SecureTestUtil.verifyAllowed(exportAction,
+      getUserByLogin(USER_ADMIN),
+      getUserByLogin(USER_OWNER),
+      getUserByLogin(USER_RX));
+    AccessTestAction deleteAction = () -> {
+      UTIL.deleteTable(TableName.valueOf(exportTable));
+      return null;
+    };
+    SecureTestUtil.verifyAllowed(deleteAction, getUserByLogin(USER_OWNER));
+    fs.delete(openDir, true);
+  }
+  @Test
+  public void testVisibilityLabels() throws IOException, Throwable {
+    final String exportTable = name.getMethodName() + "_export";
+    final String importTable = name.getMethodName() + "_import";
+    final TableDescriptor exportHtd = TableDescriptorBuilder.newBuilder(TableName.valueOf(exportTable))
+            .addColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILYA))
+            .setOwnerString(USER_OWNER)
+            .build();
+    SecureTestUtil.createTable(UTIL, exportHtd, new byte[][]{Bytes.toBytes("s")});
+    AccessTestAction putAction = () -> {
+      Put p1 = new Put(ROW1);
+      p1.addColumn(FAMILYA, QUAL, NOW, QUAL);
+      p1.setCellVisibility(new CellVisibility(SECRET));
+      Put p2 = new Put(ROW2);
+      p2.addColumn(FAMILYA, QUAL, NOW, QUAL);
+      p2.setCellVisibility(new CellVisibility(PRIVATE + " & " + CONFIDENTIAL));
+      Put p3 = new Put(ROW3);
+      p3.addColumn(FAMILYA, QUAL, NOW, QUAL);
+      p3.setCellVisibility(new CellVisibility("!" + CONFIDENTIAL + " & " + TOPSECRET));
+      try (Connection conn = ConnectionFactory.createConnection(UTIL.getConfiguration());
+              Table t = conn.getTable(TableName.valueOf(exportTable))) {
+        t.put(p1);
+        t.put(p2);
+        t.put(p3);
+      }
+      return null;
+    };
+    SecureTestUtil.verifyAllowed(putAction, getUserByLogin(USER_OWNER));
+    List<Pair<List<String>, Integer>> labelsAndRowCounts = new LinkedList<>();
+    labelsAndRowCounts.add(new Pair<>(Arrays.asList(SECRET), 1));
+    labelsAndRowCounts.add(new Pair<>(Arrays.asList(PRIVATE, CONFIDENTIAL), 1));
+    labelsAndRowCounts.add(new Pair<>(Arrays.asList(TOPSECRET), 1));
+    labelsAndRowCounts.add(new Pair<>(Arrays.asList(TOPSECRET, CONFIDENTIAL), 0));
+    labelsAndRowCounts.add(new Pair<>(Arrays.asList(TOPSECRET, CONFIDENTIAL, PRIVATE, SECRET), 2));
+    for (final Pair<List<String>, Integer> labelsAndRowCount : labelsAndRowCounts) {
+      final List<String> labels = labelsAndRowCount.getFirst();
+      final int rowCount = labelsAndRowCount.getSecond();
+      //create a open permission directory.
+      final Path openDir = new Path("testAccessCase");
+      final FileSystem fs = openDir.getFileSystem(UTIL.getConfiguration());
+      fs.mkdirs(openDir);
+      fs.setPermission(openDir, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
+      final Path output = fs.makeQualified(new Path(openDir, "output"));
+      AccessTestAction exportAction = () -> {
+        StringBuilder buf = new StringBuilder();
+        labels.forEach(v -> buf.append(v).append(","));
+        buf.deleteCharAt(buf.length() - 1);
+        try {
+          String[] args = new String[]{
+            "-D " + ExportUtils.EXPORT_VISIBILITY_LABELS + "=" + buf.toString(),
+            exportTable,
+            output.toString(),};
+          Export.run(new Configuration(UTIL.getConfiguration()), args);
+          return null;
+        } catch (ServiceException | IOException ex) {
+          throw ex;
+        } catch (Throwable ex) {
+          throw new Exception(ex);
+        }
+      };
+      SecureTestUtil.verifyAllowed(exportAction, getUserByLogin(USER_OWNER));
+      final TableDescriptor importHtd = TableDescriptorBuilder.newBuilder(TableName.valueOf(importTable))
+              .addColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILYB))
+              .setOwnerString(USER_OWNER)
+              .build();
+      SecureTestUtil.createTable(UTIL, importHtd, new byte[][]{Bytes.toBytes("s")});
+      AccessTestAction importAction = () -> {
+        String[] args = new String[]{
+          "-D" + Import.CF_RENAME_PROP + "=" + FAMILYA_STRING + ":" + FAMILYB_STRING,
+          importTable,
+          output.toString()
+        };
+        assertEquals(0, ToolRunner.run(new Configuration(UTIL.getConfiguration()), new Import(), args));
+        return null;
+      };
+      SecureTestUtil.verifyAllowed(importAction, getUserByLogin(USER_OWNER));
+      AccessTestAction scanAction = () -> {
+        Scan scan = new Scan();
+        scan.setAuthorizations(new Authorizations(labels));
+        try (Connection conn = ConnectionFactory.createConnection(UTIL.getConfiguration());
+                Table table = conn.getTable(importHtd.getTableName());
+                ResultScanner scanner = table.getScanner(scan)) {
+          int count = 0;
+          for (Result r : scanner) {
+            ++count;
+          }
+          assertEquals(rowCount, count);
+        }
+        return null;
+      };
+      SecureTestUtil.verifyAllowed(scanAction, getUserByLogin(USER_OWNER));
+      AccessTestAction deleteAction = () -> {
+        UTIL.deleteTable(importHtd.getTableName());
+        return null;
+      };
+      SecureTestUtil.verifyAllowed(deleteAction, getUserByLogin(USER_OWNER));
+      clearOutput(output);
+    }
+    AccessTestAction deleteAction = () -> {
+      UTIL.deleteTable(exportHtd.getTableName());
+      return null;
+    };
+    SecureTestUtil.verifyAllowed(deleteAction, getUserByLogin(USER_OWNER));
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/74659730/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/ExportEndpointExample.java
----------------------------------------------------------------------
diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/ExportEndpointExample.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/ExportEndpointExample.java
new file mode 100644
index 0000000..cc06844
--- /dev/null
+++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/ExportEndpointExample.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.example;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.coprocessor.Export;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A simple example on how to use {@link org.apache.hadoop.hbase.coprocessor.Export}.
+ *
+ * <p>
+ * For the protocol buffer definition of the ExportService, see the source file located under
+ * hbase-endpoint/src/main/protobuf/Export.proto.
+ * </p>
+ */
+public class ExportEndpointExample {
+
+  public static void main(String[] args) throws Throwable {
+    int rowCount = 100;
+    byte[] family = Bytes.toBytes("family");
+    Configuration conf = HBaseConfiguration.create();
+    TableName tableName = TableName.valueOf("ExportEndpointExample");
+    try (Connection con = ConnectionFactory.createConnection(conf);
+         Admin admin = con.getAdmin()) {
+      TableDescriptor desc = TableDescriptorBuilder.newBuilder(tableName)
+              // MUST mount the export endpoint
+              .addCoprocessor(Export.class.getName())
+              .addColumnFamily(ColumnFamilyDescriptorBuilder.of(family))
+              .build();
+      admin.createTable(desc);
+
+      List<Put> puts = new ArrayList<>(rowCount);
+      for (int row = 0; row != rowCount; ++row) {
+        byte[] bs = Bytes.toBytes(row);
+        Put put = new Put(bs);
+        put.addColumn(family, bs, bs);
+        puts.add(put);
+      }
+      try (Table table = con.getTable(tableName)) {
+        table.put(puts);
+      }
+
+      Path output = new Path("/tmp/ExportEndpointExample_output");
+      Scan scan = new Scan();
+      Map<byte[], Export.Response> result = Export.run(conf, tableName, scan, output);
+      final long totalOutputRows = result.values().stream().mapToLong(v -> v.getRowCount()).sum();
+      final long totalOutputCells = result.values().stream().mapToLong(v -> v.getCellCount()).sum();
+      System.out.println("table:" + tableName);
+      System.out.println("output:" + output);
+      System.out.println("total rows:" + totalOutputRows);
+      System.out.println("total cells:" + totalOutputCells);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/74659730/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Export.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Export.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Export.java
index de6cf3a..b2dc254 100644
--- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Export.java
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Export.java
@@ -20,23 +20,17 @@ package org.apache.hadoop.hbase.mapreduce;
 
 import java.io.IOException;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
-import org.apache.hadoop.hbase.filter.Filter;
-import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
-import org.apache.hadoop.hbase.filter.PrefixFilter;
-import org.apache.hadoop.hbase.filter.RegexStringComparator;
-import org.apache.hadoop.hbase.filter.RowFilter;
+import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ArrayUtils;
+import org.apache.hadoop.hbase.util.Triple;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
@@ -50,12 +44,8 @@ import org.apache.hadoop.util.ToolRunner;
  */
 @InterfaceAudience.Public
 public class Export extends Configured implements Tool {
-  private static final Log LOG = LogFactory.getLog(Export.class);
-  final static String NAME = "export";
-  final static String RAW_SCAN = "hbase.mapreduce.include.deleted.rows";
-  final static String EXPORT_BATCHING = "hbase.export.scanner.batch";
-
-  private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
+  static final String NAME = "export";
+  static final String JOB_NAME_CONF_KEY = "mapreduce.job.name";
 
   /**
    * Sets up the actual job.
@@ -67,13 +57,14 @@ public class Export extends Configured implements Tool {
    */
   public static Job createSubmittableJob(Configuration conf, String[] args)
   throws IOException {
-    String tableName = args[0];
-    Path outputDir = new Path(args[1]);
+    Triple<TableName, Scan, Path> arguments = ExportUtils.getArgumentsFromCommandLine(conf, args);
+    String tableName = arguments.getFirst().getNameAsString();
+    Path outputDir = arguments.getThird();
     Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName));
     job.setJobName(NAME + "_" + tableName);
     job.setJarByClass(Export.class);
     // Set optional scan parameters
-    Scan s = getConfiguredScanForJob(conf, args);
+    Scan s = arguments.getSecond();
     IdentityTableMapper.initJob(tableName, s, IdentityTableMapper.class, job);
     // No reducers.  Just write straight to output files.
     job.setNumReduceTasks(0);
@@ -84,101 +75,15 @@ public class Export extends Configured implements Tool {
     return job;
   }
 
-  private static Scan getConfiguredScanForJob(Configuration conf, String[] args) throws IOException {
-    Scan s = new Scan();
-    // Optional arguments.
-    // Set Scan Versions
-    int versions = args.length > 2? Integer.parseInt(args[2]): 1;
-    s.setMaxVersions(versions);
-    // Set Scan Range
-    long startTime = args.length > 3? Long.parseLong(args[3]): 0L;
-    long endTime = args.length > 4? Long.parseLong(args[4]): Long.MAX_VALUE;
-    s.setTimeRange(startTime, endTime);
-    // Set cache blocks
-    s.setCacheBlocks(false);
-    // set Start and Stop row
-    if (conf.get(TableInputFormat.SCAN_ROW_START) != null) {
-      s.setStartRow(Bytes.toBytesBinary(conf.get(TableInputFormat.SCAN_ROW_START)));
-    }
-    if (conf.get(TableInputFormat.SCAN_ROW_STOP) != null) {
-      s.setStopRow(Bytes.toBytesBinary(conf.get(TableInputFormat.SCAN_ROW_STOP)));
-    }
-    // Set Scan Column Family
-    boolean raw = Boolean.parseBoolean(conf.get(RAW_SCAN));
-    if (raw) {
-      s.setRaw(raw);
-    }
-    for (String columnFamily : conf.getTrimmedStrings(TableInputFormat.SCAN_COLUMN_FAMILY)) {
-      s.addFamily(Bytes.toBytes(columnFamily));
-    }
-    // Set RowFilter or Prefix Filter if applicable.
-    Filter exportFilter = getExportFilter(args);
-    if (exportFilter!= null) {
-        LOG.info("Setting Scan Filter for Export.");
-      s.setFilter(exportFilter);
-    }
-
-    int batching = conf.getInt(EXPORT_BATCHING, -1);
-    if (batching !=  -1){
-      try {
-        s.setBatch(batching);
-      } catch (IncompatibleFilterException e) {
-        LOG.error("Batching could not be set", e);
-      }
-    }
-    LOG.info("versions=" + versions + ", starttime=" + startTime +
-      ", endtime=" + endTime + ", keepDeletedCells=" + raw);
-    return s;
-  }
-
-  private static Filter getExportFilter(String[] args) {
-    Filter exportFilter = null;
-    String filterCriteria = (args.length > 5) ? args[5]: null;
-    if (filterCriteria == null) return null;
-    if (filterCriteria.startsWith("^")) {
-      String regexPattern = filterCriteria.substring(1, filterCriteria.length());
-      exportFilter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator(regexPattern));
-    } else {
-      exportFilter = new PrefixFilter(Bytes.toBytesBinary(filterCriteria));
-    }
-    return exportFilter;
-  }
-
-  /*
-   * @param errorMsg Error message.  Can be null.
-   */
-  private static void usage(final String errorMsg) {
-    if (errorMsg != null && errorMsg.length() > 0) {
-      System.err.println("ERROR: " + errorMsg);
-    }
-    System.err.println("Usage: Export [-D <property=value>]* <tablename> <outputdir> [<versions> " +
-      "[<starttime> [<endtime>]] [^[regex pattern] or [Prefix] to filter]]\n");
-    System.err.println("  Note: -D properties will be applied to the conf used. ");
-    System.err.println("  For example: ");
-    System.err.println("   -D mapreduce.output.fileoutputformat.compress=true");
-    System.err.println("   -D mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec");
-    System.err.println("   -D mapreduce.output.fileoutputformat.compress.type=BLOCK");
-    System.err.println("  Additionally, the following SCAN properties can be specified");
-    System.err.println("  to control/limit what is exported..");
-    System.err.println("   -D " + TableInputFormat.SCAN_COLUMN_FAMILY + "=<family1>,<family2>, ...");
-    System.err.println("   -D " + RAW_SCAN + "=true");
-    System.err.println("   -D " + TableInputFormat.SCAN_ROW_START + "=<ROWSTART>");
-    System.err.println("   -D " + TableInputFormat.SCAN_ROW_STOP + "=<ROWSTOP>");
-    System.err.println("   -D " + JOB_NAME_CONF_KEY
-        + "=jobName - use the specified mapreduce job name for the export");
-    System.err.println("For performance consider the following properties:\n"
-        + "   -Dhbase.client.scanner.caching=100\n"
-        + "   -Dmapreduce.map.speculative=false\n"
-        + "   -Dmapreduce.reduce.speculative=false");
-    System.err.println("For tables with very wide rows consider setting the batch size as below:\n"
-        + "   -D" + EXPORT_BATCHING + "=10");
-  }
-
-
   @Override
   public int run(String[] args) throws Exception {
-    if (args.length < 2) {
-      usage("Wrong number of arguments: " + args.length);
+    if (!ExportUtils.isValidArguements(args)) {
+      ExportUtils.usage("Wrong number of arguments: " + ArrayUtils.length(args));
+      System.err.println("   -D " + JOB_NAME_CONF_KEY
+              + "=jobName - use the specified mapreduce job name for the export");
+      System.err.println("For MR performance consider the following properties:");
+      System.err.println("   -D mapreduce.map.speculative=false");
+      System.err.println("   -D mapreduce.reduce.speculative=false");
       return -1;
     }
     Job job = createSubmittableJob(getConf(), args);

http://git-wip-us.apache.org/repos/asf/hbase/blob/74659730/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/ExportUtils.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/ExportUtils.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/ExportUtils.java
new file mode 100644
index 0000000..9cc2a80
--- /dev/null
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/ExportUtils.java
@@ -0,0 +1,175 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.CompareFilter;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
+import org.apache.hadoop.hbase.filter.PrefixFilter;
+import org.apache.hadoop.hbase.filter.RegexStringComparator;
+import org.apache.hadoop.hbase.filter.RowFilter;
+import org.apache.hadoop.hbase.security.visibility.Authorizations;
+import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Triple;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+/**
+ * Some helper methods are used by {@link org.apache.hadoop.hbase.mapreduce.Export}
+ * and org.apache.hadoop.hbase.coprocessor.Export (in hbase-endpooint).
+ */
+@InterfaceAudience.Private
+public final class ExportUtils {
+  private static final Log LOG = LogFactory.getLog(ExportUtils.class);
+  public static final String RAW_SCAN = "hbase.mapreduce.include.deleted.rows";
+  public static final String EXPORT_BATCHING = "hbase.export.scanner.batch";
+  public static final String EXPORT_CACHING = "hbase.export.scanner.caching";
+  public static final String EXPORT_VISIBILITY_LABELS = "hbase.export.visibility.labels";
+  /**
+   * Common usage for other export tools.
+   * @param errorMsg Error message.  Can be null.
+   */
+  public static void usage(final String errorMsg) {
+    if (errorMsg != null && errorMsg.length() > 0) {
+      System.err.println("ERROR: " + errorMsg);
+    }
+    System.err.println("Usage: Export [-D <property=value>]* <tablename> <outputdir> [<versions> " +
+      "[<starttime> [<endtime>]] [^[regex pattern] or [Prefix] to filter]]\n");
+    System.err.println("  Note: -D properties will be applied to the conf used. ");
+    System.err.println("  For example: ");
+    System.err.println("   -D " + FileOutputFormat.COMPRESS + "=true");
+    System.err.println("   -D " + FileOutputFormat.COMPRESS_CODEC + "=org.apache.hadoop.io.compress.GzipCodec");
+    System.err.println("   -D " + FileOutputFormat.COMPRESS_TYPE + "=BLOCK");
+    System.err.println("  Additionally, the following SCAN properties can be specified");
+    System.err.println("  to control/limit what is exported..");
+    System.err.println("   -D " + TableInputFormat.SCAN_COLUMN_FAMILY + "=<family1>,<family2>, ...");
+    System.err.println("   -D " + RAW_SCAN + "=true");
+    System.err.println("   -D " + TableInputFormat.SCAN_ROW_START + "=<ROWSTART>");
+    System.err.println("   -D " + TableInputFormat.SCAN_ROW_STOP + "=<ROWSTOP>");
+    System.err.println("   -D " + HConstants.HBASE_CLIENT_SCANNER_CACHING + "=100");
+    System.err.println("   -D " + EXPORT_VISIBILITY_LABELS + "=<labels>");
+    System.err.println("For tables with very wide rows consider setting the batch size as below:\n"
+            + "   -D " + EXPORT_BATCHING + "=10\n"
+            + "   -D " + EXPORT_CACHING + "=100");
+  }
+
+  private static Filter getExportFilter(String[] args) {
+    Filter exportFilter;
+    String filterCriteria = (args.length > 5) ? args[5]: null;
+    if (filterCriteria == null) return null;
+    if (filterCriteria.startsWith("^")) {
+      String regexPattern = filterCriteria.substring(1, filterCriteria.length());
+      exportFilter = new RowFilter(CompareFilter.CompareOp.EQUAL, new RegexStringComparator(regexPattern));
+    } else {
+      exportFilter = new PrefixFilter(Bytes.toBytesBinary(filterCriteria));
+    }
+    return exportFilter;
+  }
+
+  public static boolean isValidArguements(String[] args) {
+    return args != null && args.length >= 2;
+  }
+
+  public static Triple<TableName, Scan, Path> getArgumentsFromCommandLine(
+          Configuration conf, String[] args) throws IOException {
+    if (!isValidArguements(args)) {
+      return null;
+    }
+    return new Triple<>(TableName.valueOf(args[0]), getScanFromCommandLine(conf, args), new Path(args[1]));
+  }
+
+  @VisibleForTesting
+  static Scan getScanFromCommandLine(Configuration conf, String[] args) throws IOException {
+    Scan s = new Scan();
+    // Optional arguments.
+    // Set Scan Versions
+    int versions = args.length > 2? Integer.parseInt(args[2]): 1;
+    s.setMaxVersions(versions);
+    // Set Scan Range
+    long startTime = args.length > 3? Long.parseLong(args[3]): 0L;
+    long endTime = args.length > 4? Long.parseLong(args[4]): Long.MAX_VALUE;
+    s.setTimeRange(startTime, endTime);
+    // Set cache blocks
+    s.setCacheBlocks(false);
+    // set Start and Stop row
+    if (conf.get(TableInputFormat.SCAN_ROW_START) != null) {
+      s.setStartRow(Bytes.toBytesBinary(conf.get(TableInputFormat.SCAN_ROW_START)));
+    }
+    if (conf.get(TableInputFormat.SCAN_ROW_STOP) != null) {
+      s.setStopRow(Bytes.toBytesBinary(conf.get(TableInputFormat.SCAN_ROW_STOP)));
+    }
+    // Set Scan Column Family
+    boolean raw = Boolean.parseBoolean(conf.get(RAW_SCAN));
+    if (raw) {
+      s.setRaw(raw);
+    }
+    for (String columnFamily : conf.getTrimmedStrings(TableInputFormat.SCAN_COLUMN_FAMILY)) {
+      s.addFamily(Bytes.toBytes(columnFamily));
+    }
+    // Set RowFilter or Prefix Filter if applicable.
+    Filter exportFilter = getExportFilter(args);
+    if (exportFilter!= null) {
+        LOG.info("Setting Scan Filter for Export.");
+      s.setFilter(exportFilter);
+    }
+    List<String> labels = null;
+    if (conf.get(EXPORT_VISIBILITY_LABELS) != null) {
+      labels = Arrays.asList(conf.getStrings(EXPORT_VISIBILITY_LABELS));
+      if (!labels.isEmpty()) {
+        s.setAuthorizations(new Authorizations(labels));
+      }
+    }
+
+    int batching = conf.getInt(EXPORT_BATCHING, -1);
+    if (batching != -1) {
+      try {
+        s.setBatch(batching);
+      } catch (IncompatibleFilterException e) {
+        LOG.error("Batching could not be set", e);
+      }
+    }
+
+    int caching = conf.getInt(EXPORT_CACHING, 100);
+    if (caching != -1) {
+      try {
+        s.setCaching(caching);
+      } catch (IncompatibleFilterException e) {
+        LOG.error("Caching could not be set", e);
+      }
+    }
+    LOG.info("versions=" + versions + ", starttime=" + startTime
+      + ", endtime=" + endTime + ", keepDeletedCells=" + raw
+      + ", visibility labels=" + labels);
+    return s;
+  }
+
+  private ExportUtils() {
+  }
+}