You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ch...@apache.org on 2017/08/30 06:11:19 UTC
[1/2] hbase git commit: HBASE-15806 An endpoint-based export tool
Repository: hbase
Updated Branches:
refs/heads/master 314d35702 -> 746597306
http://git-wip-us.apache.org/repos/asf/hbase/blob/74659730/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java
index 91d2696..60d88bc 100644
--- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java
@@ -44,12 +44,11 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeepDeletedCells;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
@@ -58,6 +57,8 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterBase;
import org.apache.hadoop.hbase.filter.PrefixFilter;
@@ -72,6 +73,7 @@ import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.LauncherSecurityManager;
import org.apache.hadoop.mapreduce.Mapper.Context;
+import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.ToolRunner;
import org.junit.After;
import org.junit.AfterClass;
@@ -90,8 +92,9 @@ import org.mockito.stubbing.Answer;
*/
@Category({VerySlowMapReduceTests.class, MediumTests.class})
public class TestImportExport {
+
private static final Log LOG = LogFactory.getLog(TestImportExport.class);
- private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+ protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
private static final byte[] ROW1 = Bytes.toBytesBinary("\\x32row1");
private static final byte[] ROW2 = Bytes.toBytesBinary("\\x32row2");
private static final byte[] ROW3 = Bytes.toBytesBinary("\\x32row3");
@@ -104,10 +107,12 @@ public class TestImportExport {
private static String FQ_OUTPUT_DIR;
private static final String EXPORT_BATCH_SIZE = "100";
- private static long now = System.currentTimeMillis();
+ private static final long now = System.currentTimeMillis();
+ private final TableName EXPORT_TABLE = TableName.valueOf("export_table");
+ private final TableName IMPORT_TABLE = TableName.valueOf("import_table");
@BeforeClass
- public static void beforeClass() throws Exception {
+ public static void beforeClass() throws Throwable {
// Up the handlers; this test needs more than usual.
UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10);
UTIL.startMiniCluster();
@@ -116,7 +121,7 @@ public class TestImportExport {
}
@AfterClass
- public static void afterClass() throws Exception {
+ public static void afterClass() throws Throwable {
UTIL.shutdownMiniCluster();
}
@@ -128,11 +133,16 @@ public class TestImportExport {
LOG.info("Running " + name.getMethodName());
}
- @Before
@After
- public void cleanup() throws Exception {
+ public void cleanup() throws Throwable {
FileSystem fs = FileSystem.get(UTIL.getConfiguration());
fs.delete(new Path(OUTPUT_DIR), true);
+ if (UTIL.getAdmin().tableExists(EXPORT_TABLE)) {
+ UTIL.deleteTable(EXPORT_TABLE);
+ }
+ if (UTIL.getAdmin().tableExists(IMPORT_TABLE)) {
+ UTIL.deleteTable(IMPORT_TABLE);
+ }
}
/**
@@ -143,12 +153,16 @@ public class TestImportExport {
* @throws InterruptedException
* @throws ClassNotFoundException
*/
- boolean runExport(String[] args) throws Exception {
+ protected boolean runExport(String[] args) throws Throwable {
// need to make a copy of the configuration because to make sure different temp dirs are used.
int status = ToolRunner.run(new Configuration(UTIL.getConfiguration()), new Export(), args);
return status == 0;
}
+ protected void runExportMain(String[] args) throws Throwable {
+ Export.main(args);
+ }
+
/**
* Runs an import job with the specified command line args
* @param args
@@ -157,7 +171,7 @@ public class TestImportExport {
* @throws InterruptedException
* @throws ClassNotFoundException
*/
- boolean runImport(String[] args) throws Exception {
+ boolean runImport(String[] args) throws Throwable {
// need to make a copy of the configuration because to make sure different temp dirs are used.
int status = ToolRunner.run(new Configuration(UTIL.getConfiguration()), new Import(), args);
return status == 0;
@@ -168,7 +182,7 @@ public class TestImportExport {
* @throws Exception
*/
@Test
- public void testSimpleCase() throws Exception {
+ public void testSimpleCase() throws Throwable {
try (Table t = UTIL.createTable(TableName.valueOf(name.getMethodName()), FAMILYA, 3);) {
Put p = new Put(ROW1);
p.addColumn(FAMILYA, QUAL, now, QUAL);
@@ -223,21 +237,21 @@ public class TestImportExport {
/**
* Test export hbase:meta table
*
- * @throws Exception
+ * @throws Throwable
*/
@Test
- public void testMetaExport() throws Exception {
- String EXPORT_TABLE = TableName.META_TABLE_NAME.getNameAsString();
- String[] args = new String[] { EXPORT_TABLE, FQ_OUTPUT_DIR, "1", "0", "0" };
+ public void testMetaExport() throws Throwable {
+ String[] args = new String[] { TableName.META_TABLE_NAME.getNameAsString(),
+ FQ_OUTPUT_DIR, "1", "0", "0" };
assertTrue(runExport(args));
}
/**
* Test import data from 0.94 exported file
- * @throws Exception
+ * @throws Throwable
*/
@Test
- public void testImport94Table() throws Exception {
+ public void testImport94Table() throws Throwable {
final String name = "exportedTableIn94Format";
URL url = TestImportExport.class.getResource(name);
File f = new File(url.toURI());
@@ -273,11 +287,13 @@ public class TestImportExport {
* Test export scanner batching
*/
@Test
- public void testExportScannerBatching() throws Exception {
- HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(name.getMethodName()));
- desc.addFamily(new HColumnDescriptor(FAMILYA)
- .setMaxVersions(1)
- );
+ public void testExportScannerBatching() throws Throwable {
+ TableDescriptor desc = TableDescriptorBuilder
+ .newBuilder(TableName.valueOf(name.getMethodName()))
+ .addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA)
+ .setMaxVersions(1)
+ .build())
+ .build();
UTIL.getAdmin().createTable(desc);
try (Table t = UTIL.getConnection().getTable(desc.getTableName());) {
@@ -290,7 +306,7 @@ public class TestImportExport {
t.put(p);
String[] args = new String[] {
- "-D" + Export.EXPORT_BATCHING + "=" + EXPORT_BATCH_SIZE, // added scanner batching arg.
+ "-D" + ExportUtils.EXPORT_BATCHING + "=" + EXPORT_BATCH_SIZE, // added scanner batching arg.
name.getMethodName(),
FQ_OUTPUT_DIR
};
@@ -302,12 +318,14 @@ public class TestImportExport {
}
@Test
- public void testWithDeletes() throws Exception {
- HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(name.getMethodName()));
- desc.addFamily(new HColumnDescriptor(FAMILYA)
- .setMaxVersions(5)
- .setKeepDeletedCells(KeepDeletedCells.TRUE)
- );
+ public void testWithDeletes() throws Throwable {
+ TableDescriptor desc = TableDescriptorBuilder
+ .newBuilder(TableName.valueOf(name.getMethodName()))
+ .addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA)
+ .setMaxVersions(5)
+ .setKeepDeletedCells(KeepDeletedCells.TRUE)
+ .build())
+ .build();
UTIL.getAdmin().createTable(desc);
try (Table t = UTIL.getConnection().getTable(desc.getTableName());) {
@@ -327,7 +345,7 @@ public class TestImportExport {
}
String[] args = new String[] {
- "-D" + Export.RAW_SCAN + "=true",
+ "-D" + ExportUtils.RAW_SCAN + "=true",
name.getMethodName(),
FQ_OUTPUT_DIR,
"1000", // max number of key versions per key to export
@@ -335,11 +353,13 @@ public class TestImportExport {
assertTrue(runExport(args));
final String IMPORT_TABLE = name.getMethodName() + "import";
- desc = new HTableDescriptor(TableName.valueOf(IMPORT_TABLE));
- desc.addFamily(new HColumnDescriptor(FAMILYA)
- .setMaxVersions(5)
- .setKeepDeletedCells(KeepDeletedCells.TRUE)
- );
+ desc = TableDescriptorBuilder
+ .newBuilder(TableName.valueOf(IMPORT_TABLE))
+ .addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA)
+ .setMaxVersions(5)
+ .setKeepDeletedCells(KeepDeletedCells.TRUE)
+ .build())
+ .build();
UTIL.getAdmin().createTable(desc);
try (Table t = UTIL.getConnection().getTable(desc.getTableName());) {
args = new String[] {
@@ -366,13 +386,15 @@ public class TestImportExport {
@Test
- public void testWithMultipleDeleteFamilyMarkersOfSameRowSameFamily() throws Exception {
+ public void testWithMultipleDeleteFamilyMarkersOfSameRowSameFamily() throws Throwable {
final TableName exportTable = TableName.valueOf(name.getMethodName());
- HTableDescriptor desc = new HTableDescriptor(exportTable);
- desc.addFamily(new HColumnDescriptor(FAMILYA)
- .setMaxVersions(5)
- .setKeepDeletedCells(KeepDeletedCells.TRUE)
- );
+ TableDescriptor desc = TableDescriptorBuilder
+ .newBuilder(TableName.valueOf(name.getMethodName()))
+ .addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA)
+ .setMaxVersions(5)
+ .setKeepDeletedCells(KeepDeletedCells.TRUE)
+ .build())
+ .build();
UTIL.getAdmin().createTable(desc);
Table exportT = UTIL.getConnection().getTable(exportTable);
@@ -397,18 +419,20 @@ public class TestImportExport {
String[] args = new String[] {
- "-D" + Export.RAW_SCAN + "=true", exportTable.getNameAsString(),
+ "-D" + ExportUtils.RAW_SCAN + "=true", exportTable.getNameAsString(),
FQ_OUTPUT_DIR,
"1000", // max number of key versions per key to export
};
assertTrue(runExport(args));
final String importTable = name.getMethodName() + "import";
- desc = new HTableDescriptor(TableName.valueOf(importTable));
- desc.addFamily(new HColumnDescriptor(FAMILYA)
- .setMaxVersions(5)
- .setKeepDeletedCells(KeepDeletedCells.TRUE)
- );
+ desc = TableDescriptorBuilder
+ .newBuilder(TableName.valueOf(importTable))
+ .addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA)
+ .setMaxVersions(5)
+ .setKeepDeletedCells(KeepDeletedCells.TRUE)
+ .build())
+ .build();
UTIL.getAdmin().createTable(desc);
Table importT = UTIL.getConnection().getTable(TableName.valueOf(importTable));
@@ -429,7 +453,7 @@ public class TestImportExport {
Result exportedTResult = exportedTScanner.next();
try {
Result.compareResults(exportedTResult, importedTResult);
- } catch (Exception e) {
+ } catch (Throwable e) {
fail("Original and imported tables data comparision failed with error:"+e.getMessage());
} finally {
exportT.close();
@@ -442,10 +466,14 @@ public class TestImportExport {
* attempt with invalid values.
*/
@Test
- public void testWithFilter() throws Exception {
+ public void testWithFilter() throws Throwable {
// Create simple table to export
- HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(name.getMethodName()));
- desc.addFamily(new HColumnDescriptor(FAMILYA).setMaxVersions(5));
+ TableDescriptor desc = TableDescriptorBuilder
+ .newBuilder(TableName.valueOf(name.getMethodName()))
+ .addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA)
+ .setMaxVersions(5)
+ .build())
+ .build();
UTIL.getAdmin().createTable(desc);
Table exportTable = UTIL.getConnection().getTable(desc.getTableName());
@@ -468,8 +496,12 @@ public class TestImportExport {
// Import to a new table
final String IMPORT_TABLE = name.getMethodName() + "import";
- desc = new HTableDescriptor(TableName.valueOf(IMPORT_TABLE));
- desc.addFamily(new HColumnDescriptor(FAMILYA).setMaxVersions(5));
+ desc = TableDescriptorBuilder
+ .newBuilder(TableName.valueOf(IMPORT_TABLE))
+ .addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA)
+ .setMaxVersions(5)
+ .build())
+ .build();
UTIL.getAdmin().createTable(desc);
Table importTable = UTIL.getConnection().getTable(desc.getTableName());
@@ -501,8 +533,6 @@ public class TestImportExport {
/**
* Count the number of keyvalues in the specified table for the given timerange
- * @param start
- * @param end
* @param table
* @return
* @throws IOException
@@ -523,7 +553,7 @@ public class TestImportExport {
* test main method. Import should print help and call System.exit
*/
@Test
- public void testImportMain() throws Exception {
+ public void testImportMain() throws Throwable {
PrintStream oldPrintStream = System.err;
SecurityManager SECURITY_MANAGER = System.getSecurityManager();
LauncherSecurityManager newSecurityManager= new LauncherSecurityManager();
@@ -548,11 +578,56 @@ public class TestImportExport {
}
}
+ @Test
+ public void testExportScan() throws Exception {
+ int version = 100;
+ long startTime = System.currentTimeMillis();
+ long endTime = startTime + 1;
+ String prefix = "row";
+ String label_0 = "label_0";
+ String label_1 = "label_1";
+ String[] args = {
+ "table",
+ "outputDir",
+ String.valueOf(version),
+ String.valueOf(startTime),
+ String.valueOf(endTime),
+ prefix
+ };
+ Scan scan = ExportUtils.getScanFromCommandLine(UTIL.getConfiguration(), args);
+ assertEquals(version, scan.getMaxVersions());
+ assertEquals(startTime, scan.getTimeRange().getMin());
+ assertEquals(endTime, scan.getTimeRange().getMax());
+ assertEquals(true, (scan.getFilter() instanceof PrefixFilter));
+ assertEquals(0, Bytes.compareTo(((PrefixFilter) scan.getFilter()).getPrefix(), Bytes.toBytesBinary(prefix)));
+ String[] argsWithLabels = {
+ "-D " + ExportUtils.EXPORT_VISIBILITY_LABELS + "=" + label_0 + "," + label_1,
+ "table",
+ "outputDir",
+ String.valueOf(version),
+ String.valueOf(startTime),
+ String.valueOf(endTime),
+ prefix
+ };
+ Configuration conf = new Configuration(UTIL.getConfiguration());
+ // parse the "-D" options
+ String[] otherArgs = new GenericOptionsParser(conf, argsWithLabels).getRemainingArgs();
+ Scan scanWithLabels = ExportUtils.getScanFromCommandLine(conf, otherArgs);
+ assertEquals(version, scanWithLabels.getMaxVersions());
+ assertEquals(startTime, scanWithLabels.getTimeRange().getMin());
+ assertEquals(endTime, scanWithLabels.getTimeRange().getMax());
+ assertEquals(true, (scanWithLabels.getFilter() instanceof PrefixFilter));
+ assertEquals(0, Bytes.compareTo(((PrefixFilter) scanWithLabels.getFilter()).getPrefix(), Bytes.toBytesBinary(prefix)));
+ assertEquals(2, scanWithLabels.getAuthorizations().getLabels().size());
+ assertEquals(label_0, scanWithLabels.getAuthorizations().getLabels().get(0));
+ assertEquals(label_1, scanWithLabels.getAuthorizations().getLabels().get(1));
+ }
+
/**
* test main method. Export should print help and call System.exit
*/
@Test
- public void testExportMain() throws Exception {
+ public void testExportMain() throws Throwable {
PrintStream oldPrintStream = System.err;
SecurityManager SECURITY_MANAGER = System.getSecurityManager();
LauncherSecurityManager newSecurityManager= new LauncherSecurityManager();
@@ -562,7 +637,7 @@ public class TestImportExport {
System.setErr(new PrintStream(data));
try {
System.setErr(new PrintStream(data));
- Export.main(args);
+ runExportMain(args);
fail("should be SecurityException");
} catch (SecurityException e) {
assertEquals(-1, newSecurityManager.getExitCode());
@@ -574,10 +649,9 @@ public class TestImportExport {
assertTrue(
errMsg.contains("-D hbase.mapreduce.scan.column.family=<family1>,<family2>, ..."));
assertTrue(errMsg.contains("-D hbase.mapreduce.include.deleted.rows=true"));
- assertTrue(errMsg.contains("-Dhbase.client.scanner.caching=100"));
- assertTrue(errMsg.contains("-Dmapreduce.map.speculative=false"));
- assertTrue(errMsg.contains("-Dmapreduce.reduce.speculative=false"));
- assertTrue(errMsg.contains("-Dhbase.export.scanner.batch=10"));
+ assertTrue(errMsg.contains("-D hbase.client.scanner.caching=100"));
+ assertTrue(errMsg.contains("-D hbase.export.scanner.batch=10"));
+ assertTrue(errMsg.contains("-D hbase.export.scanner.caching=100"));
} finally {
System.setErr(oldPrintStream);
System.setSecurityManager(SECURITY_MANAGER);
@@ -589,7 +663,7 @@ public class TestImportExport {
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
@Test
- public void testKeyValueImporter() throws Exception {
+ public void testKeyValueImporter() throws Throwable {
KeyValueImporter importer = new KeyValueImporter();
Configuration configuration = new Configuration();
Context ctx = mock(Context.class);
@@ -638,7 +712,7 @@ public class TestImportExport {
}
@Test
- public void testDurability() throws Exception {
+ public void testDurability() throws Throwable {
// Create an export table.
String exportTableName = name.getMethodName() + "export";
try (Table exportTable = UTIL.createTable(TableName.valueOf(exportTableName), FAMILYA, 3);) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/74659730/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/SecureTestUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/SecureTestUtil.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/SecureTestUtil.java
index 126c4e4..19cbc38 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/SecureTestUtil.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/SecureTestUtil.java
@@ -168,7 +168,7 @@ public class SecureTestUtil {
* To indicate the action was not allowed, either throw an AccessDeniedException
* or return an empty list of KeyValues.
*/
- protected static interface AccessTestAction extends PrivilegedExceptionAction<Object> { }
+ public interface AccessTestAction extends PrivilegedExceptionAction<Object> { }
/** This fails only in case of ADE or empty list for any of the actions. */
public static void verifyAllowed(User user, AccessTestAction... actions) throws Exception {
[2/2] hbase git commit: HBASE-15806 An endpoint-based export tool
Posted by ch...@apache.org.
HBASE-15806 An endpoint-based export tool
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/74659730
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/74659730
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/74659730
Branch: refs/heads/master
Commit: 7465973068f45a78d66394cad7c0858bfeda1b46
Parents: 314d357
Author: Chia-Ping Tsai <ch...@gmail.com>
Authored: Wed Aug 30 14:06:04 2017 +0800
Committer: Chia-Ping Tsai <ch...@gmail.com>
Committed: Wed Aug 30 14:06:04 2017 +0800
----------------------------------------------------------------------
hbase-endpoint/pom.xml | 17 +
.../apache/hadoop/hbase/coprocessor/Export.java | 538 +++++++++++++++++++
hbase-endpoint/src/main/protobuf/Export.proto | 45 ++
.../hbase/coprocessor/TestImportExport.java | 56 ++
.../hbase/coprocessor/TestSecureExport.java | 439 +++++++++++++++
.../client/example/ExportEndpointExample.java | 86 +++
.../apache/hadoop/hbase/mapreduce/Export.java | 131 +----
.../hadoop/hbase/mapreduce/ExportUtils.java | 175 ++++++
.../hbase/mapreduce/TestImportExport.java | 202 ++++---
.../hbase/security/access/SecureTestUtil.java | 2 +-
10 files changed, 1513 insertions(+), 178 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/74659730/hbase-endpoint/pom.xml
----------------------------------------------------------------------
diff --git a/hbase-endpoint/pom.xml b/hbase-endpoint/pom.xml
index 29bd33b..e37272f 100644
--- a/hbase-endpoint/pom.xml
+++ b/hbase-endpoint/pom.xml
@@ -166,6 +166,23 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
+ <!-- The coprocessor.Export needs mapreduce.Import and mapreduce.Export to run the unit tests -->
+ <!-- see org.apache.hadoop.hbase.coprocessor.TestImportExport -->
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-mapreduce</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-mapreduce</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-minikdc</artifactId>
+ <scope>test</scope>
+ </dependency>
<!-- General dependencies -->
<dependency>
<groupId>commons-logging</groupId>
http://git-wip-us.apache.org/repos/asf/hbase/blob/74659730/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/coprocessor/Export.java
----------------------------------------------------------------------
diff --git a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/coprocessor/Export.java b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/coprocessor/Export.java
new file mode 100644
index 0000000..7a7cc00
--- /dev/null
+++ b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/coprocessor/Export.java
@@ -0,0 +1,538 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor;
+
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.Service;
+import java.io.Closeable;
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.Coprocessor;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
+import org.apache.hadoop.hbase.ipc.RpcServer;
+import org.apache.hadoop.hbase.ipc.ServerRpcController;
+import org.apache.hadoop.hbase.mapreduce.ExportUtils;
+import org.apache.hadoop.hbase.mapreduce.Import;
+import org.apache.hadoop.hbase.mapreduce.ResultSerialization;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.DelegationToken;
+import org.apache.hadoop.hbase.protobuf.generated.ExportProtos;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.security.token.FsDelegationToken;
+import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hbase.util.ArrayUtils;
+import org.apache.hadoop.hbase.util.ByteStringer;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Triple;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * Export an HBase table. Writes content to sequence files up in HDFS. Use
+ * {@link Import} to read it back in again. It is implemented by the endpoint
+ * technique.
+ *
+ * @see org.apache.hadoop.hbase.mapreduce.Export
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
+@InterfaceStability.Evolving
+public class Export extends ExportProtos.ExportService
+ implements Coprocessor, CoprocessorService {
+
+ private static final Log LOG = LogFactory.getLog(Export.class);
+ private static final Class<? extends CompressionCodec> DEFAULT_CODEC = DefaultCodec.class;
+ private static final SequenceFile.CompressionType DEFAULT_TYPE = SequenceFile.CompressionType.RECORD;
+ private RegionCoprocessorEnvironment env = null;
+ private UserProvider userProvider;
+
+ public static void main(String[] args) throws Throwable {
+ Map<byte[], Response> response = run(HBaseConfiguration.create(), args);
+ System.exit(response == null ? -1 : 0);
+ }
+
+ @VisibleForTesting
+ static Map<byte[], Response> run(final Configuration conf, final String[] args) throws Throwable {
+ String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
+ if (!ExportUtils.isValidArguements(args)) {
+ ExportUtils.usage("Wrong number of arguments: " + ArrayUtils.length(otherArgs));
+ return null;
+ }
+ Triple<TableName, Scan, Path> arguments = ExportUtils.getArgumentsFromCommandLine(conf, otherArgs);
+ return run(conf, arguments.getFirst(), arguments.getSecond(), arguments.getThird());
+ }
+
+ public static Map<byte[], Response> run(final Configuration conf, TableName tableName, Scan scan, Path dir) throws Throwable {
+ FileSystem fs = dir.getFileSystem(conf);
+ UserProvider userProvider = UserProvider.instantiate(conf);
+ checkDir(fs, dir);
+ FsDelegationToken fsDelegationToken = new FsDelegationToken(userProvider, "renewer");
+ fsDelegationToken.acquireDelegationToken(fs);
+ try {
+ final ExportProtos.ExportRequest request = getConfiguredRequest(conf, dir,
+ scan, fsDelegationToken.getUserToken());
+ try (Connection con = ConnectionFactory.createConnection(conf);
+ Table table = con.getTable(tableName)) {
+ Map<byte[], Response> result = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+ table.coprocessorService(ExportProtos.ExportService.class,
+ scan.getStartRow(),
+ scan.getStopRow(),
+ (ExportProtos.ExportService service) -> {
+ ServerRpcController controller = new ServerRpcController();
+ Map<byte[], ExportProtos.ExportResponse> rval = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+ CoprocessorRpcUtils.BlockingRpcCallback<ExportProtos.ExportResponse>
+ rpcCallback = new CoprocessorRpcUtils.BlockingRpcCallback<>();
+ service.export(controller, request, rpcCallback);
+ if (controller.failedOnException()) {
+ throw controller.getFailedOn();
+ }
+ return rpcCallback.get();
+ }).forEach((k, v) -> result.put(k, new Response(v)));
+ return result;
+ } catch (Throwable e) {
+ fs.delete(dir, true);
+ throw e;
+ }
+ } finally {
+ fsDelegationToken.releaseDelegationToken();
+ }
+ }
+
+ private static boolean getCompression(final ExportProtos.ExportRequest request) {
+ if (request.hasCompressed()) {
+ return request.getCompressed();
+ } else {
+ return false;
+ }
+ }
+
+ private static SequenceFile.CompressionType getCompressionType(final ExportProtos.ExportRequest request) {
+ if (request.hasCompressType()) {
+ return SequenceFile.CompressionType.valueOf(request.getCompressType());
+ } else {
+ return DEFAULT_TYPE;
+ }
+ }
+
+ private static CompressionCodec getCompressionCodec(final Configuration conf, final ExportProtos.ExportRequest request) {
+ try {
+ Class<? extends CompressionCodec> codecClass;
+ if (request.hasCompressCodec()) {
+ codecClass = conf.getClassByName(request.getCompressCodec()).asSubclass(CompressionCodec.class);
+ } else {
+ codecClass = DEFAULT_CODEC;
+ }
+ return ReflectionUtils.newInstance(codecClass, conf);
+ } catch (ClassNotFoundException e) {
+ throw new IllegalArgumentException("Compression codec "
+ + request.getCompressCodec() + " was not found.", e);
+ }
+ }
+
+ private static SequenceFile.Writer.Option getOutputPath(final Configuration conf,
+ final HRegionInfo info, final ExportProtos.ExportRequest request) throws IOException {
+ Path file = new Path(request.getOutputPath(), "export-" + info.getEncodedName());
+ FileSystem fs = file.getFileSystem(conf);
+ if (fs.exists(file)) {
+ throw new IOException(file + " exists");
+ }
+ return SequenceFile.Writer.file(file);
+ }
+
+ private static List<SequenceFile.Writer.Option> getWriterOptions(final Configuration conf,
+ final HRegionInfo info, final ExportProtos.ExportRequest request) throws IOException {
+ List<SequenceFile.Writer.Option> rval = new LinkedList<>();
+ rval.add(SequenceFile.Writer.keyClass(ImmutableBytesWritable.class));
+ rval.add(SequenceFile.Writer.valueClass(Result.class));
+ rval.add(getOutputPath(conf, info, request));
+ if (getCompression(request)) {
+ rval.add(SequenceFile.Writer.compression(getCompressionType(request), getCompressionCodec(conf, request)));
+ } else {
+ rval.add(SequenceFile.Writer.compression(SequenceFile.CompressionType.NONE));
+ }
+ return rval;
+ }
+
+ private static ExportProtos.ExportResponse processData(final Region region, final Configuration conf,
+ final UserProvider userProvider, final Scan scan, final Token userToken,
+ final List<SequenceFile.Writer.Option> opts) throws IOException {
+ ScanCoprocessor cp = new ScanCoprocessor(region);
+ RegionScanner scanner = null;
+ try (RegionOp regionOp = new RegionOp(region);
+ SecureWriter out = new SecureWriter(conf, userProvider, userToken, opts)) {
+ scanner = cp.checkScannerOpen(scan);
+ ImmutableBytesWritable key = new ImmutableBytesWritable();
+ long rowCount = 0;
+ long cellCount = 0;
+ List<Result> results = new ArrayList<>();
+ List<Cell> cells = new ArrayList<>();
+ boolean hasMore;
+ do {
+ boolean bypass = cp.preScannerNext(scanner, results, scan.getBatch());
+ if (bypass) {
+ hasMore = false;
+ } else {
+ hasMore = scanner.nextRaw(cells);
+ if (cells.isEmpty()) {
+ continue;
+ }
+ Cell firstCell = cells.get(0);
+ for (Cell cell : cells) {
+ if (Bytes.compareTo(firstCell.getRowArray(), firstCell.getRowOffset(), firstCell.getRowLength(),
+ cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()) != 0) {
+ throw new IOException("Why the RegionScanner#nextRaw returns the data of different rows??"
+ + " first row=" + Bytes.toHex(firstCell.getRowArray(), firstCell.getRowOffset(), firstCell.getRowLength())
+ + ", current row=" + Bytes.toHex(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()));
+ }
+ }
+ results.add(Result.create(cells));
+ cells.clear();
+ cp.postScannerNext(scanner, results, scan.getBatch(), hasMore);
+ }
+ for (Result r : results) {
+ key.set(r.getRow());
+ out.append(key, r);
+ ++rowCount;
+ cellCount += r.size();
+ }
+ results.clear();
+ } while (hasMore);
+ return ExportProtos.ExportResponse.newBuilder()
+ .setRowCount(rowCount)
+ .setCellCount(cellCount)
+ .build();
+ } finally {
+ cp.checkScannerClose(scanner);
+ }
+ }
+
+ private static void checkDir(final FileSystem fs, final Path dir) throws IOException {
+ if (fs.exists(dir)) {
+ throw new RuntimeException("The " + dir + " exists");
+ }
+ if (!fs.mkdirs(dir)) {
+ throw new IOException("Failed to create the " + dir);
+ }
+ }
+
+ private static ExportProtos.ExportRequest getConfiguredRequest(Configuration conf,
+ Path dir, final Scan scan, final Token<?> userToken) throws IOException {
+ boolean compressed = conf.getBoolean(FileOutputFormat.COMPRESS, false);
+ String compressionType = conf.get(FileOutputFormat.COMPRESS_TYPE,
+ DEFAULT_TYPE.toString());
+ String compressionCodec = conf.get(FileOutputFormat.COMPRESS_CODEC,
+ DEFAULT_CODEC.getName());
+ DelegationToken protoToken = null;
+ if (userToken != null) {
+ protoToken = DelegationToken.newBuilder()
+ .setIdentifier(ByteStringer.wrap(userToken.getIdentifier()))
+ .setPassword(ByteStringer.wrap(userToken.getPassword()))
+ .setKind(userToken.getKind().toString())
+ .setService(userToken.getService().toString()).build();
+ }
+ LOG.info("compressed=" + compressed
+ + ", compression type=" + compressionType
+ + ", compression codec=" + compressionCodec
+ + ", userToken=" + userToken);
+ ExportProtos.ExportRequest.Builder builder = ExportProtos.ExportRequest.newBuilder()
+ .setScan(ProtobufUtil.toScan(scan))
+ .setOutputPath(dir.toString())
+ .setCompressed(compressed)
+ .setCompressCodec(compressionCodec)
+ .setCompressType(compressionType);
+ if (protoToken != null) {
+ builder.setFsToken(protoToken);
+ }
+ return builder.build();
+ }
+
+
+ @Override
+ public void start(CoprocessorEnvironment environment) throws IOException {
+ if (environment instanceof RegionCoprocessorEnvironment) {
+ env = (RegionCoprocessorEnvironment) environment;
+ userProvider = UserProvider.instantiate(env.getConfiguration());
+ } else {
+ throw new CoprocessorException("Must be loaded on a table region!");
+ }
+ }
+
+ @Override
+ public void stop(CoprocessorEnvironment env) throws IOException {
+ }
+
+ @Override
+ public Service getService() {
+ return this;
+ }
+
+ @Override
+ public void export(RpcController controller, ExportProtos.ExportRequest request,
+ RpcCallback<ExportProtos.ExportResponse> done) {
+ Region region = env.getRegion();
+ Configuration conf = HBaseConfiguration.create(env.getConfiguration());
+ conf.setStrings("io.serializations", conf.get("io.serializations"), ResultSerialization.class.getName());
+ try {
+ Scan scan = validateKey(region.getRegionInfo(), request);
+ Token userToken = null;
+ if (userProvider.isHadoopSecurityEnabled() && !request.hasFsToken()) {
+ LOG.warn("Hadoop security is enable, but no found of user token");
+ } else if (userProvider.isHadoopSecurityEnabled()) {
+ userToken = new Token(request.getFsToken().getIdentifier().toByteArray(),
+ request.getFsToken().getPassword().toByteArray(),
+ new Text(request.getFsToken().getKind()),
+ new Text(request.getFsToken().getService()));
+ }
+ ExportProtos.ExportResponse response = processData(region, conf, userProvider,
+ scan, userToken, getWriterOptions(conf, region.getRegionInfo(), request));
+ done.run(response);
+ } catch (IOException e) {
+ CoprocessorRpcUtils.setControllerException(controller, e);
+ LOG.error(e);
+ }
+ }
+
+ private Scan validateKey(final HRegionInfo region, final ExportProtos.ExportRequest request) throws IOException {
+ Scan scan = ProtobufUtil.toScan(request.getScan());
+ byte[] regionStartKey = region.getStartKey();
+ byte[] originStartKey = scan.getStartRow();
+ if (originStartKey == null
+ || Bytes.compareTo(originStartKey, regionStartKey) < 0) {
+ scan.setStartRow(regionStartKey);
+ }
+ byte[] regionEndKey = region.getEndKey();
+ byte[] originEndKey = scan.getStopRow();
+ if (originEndKey == null
+ || Bytes.compareTo(originEndKey, regionEndKey) > 0) {
+ scan.setStartRow(regionEndKey);
+ }
+ return scan;
+ }
+
+ private static class RegionOp implements Closeable {
+
+ private final Region region;
+
+ RegionOp(final Region region) throws IOException {
+ this.region = region;
+ region.startRegionOperation();
+ }
+
+ @Override
+ public void close() throws IOException {
+ region.closeRegionOperation();
+ }
+ }
+
+ private static class ScanCoprocessor {
+
+ private final Region region;
+
+ ScanCoprocessor(final Region region) {
+ this.region = region;
+ }
+
+ RegionScanner checkScannerOpen(final Scan scan) throws IOException {
+ RegionScanner scanner;
+ if (region.getCoprocessorHost() == null) {
+ scanner = region.getScanner(scan);
+ } else {
+ scanner = region.getCoprocessorHost().preScannerOpen(scan);
+ if (scanner == null) {
+ scanner = region.getScanner(scan);
+ }
+ scanner = region.getCoprocessorHost().postScannerOpen(scan, scanner);
+ }
+ if (scanner == null) {
+ throw new IOException("Failed to open region scanner");
+ }
+ return scanner;
+ }
+
+ void checkScannerClose(final InternalScanner s) throws IOException {
+ if (s == null) {
+ return;
+ }
+ if (region.getCoprocessorHost() == null) {
+ s.close();
+ return;
+ }
+ if (region.getCoprocessorHost().preScannerClose(s)) {
+ return;
+ }
+ try {
+ s.close();
+ } finally {
+ region.getCoprocessorHost().postScannerClose(s);
+ }
+ }
+
+ boolean preScannerNext(final InternalScanner s,
+ final List<Result> results, final int limit) throws IOException {
+ if (region.getCoprocessorHost() == null) {
+ return false;
+ } else {
+ Boolean bypass = region.getCoprocessorHost().preScannerNext(s, results, limit);
+ return bypass == null ? false : bypass;
+ }
+ }
+
+ boolean postScannerNext(final InternalScanner s,
+ final List<Result> results, final int limit, boolean hasMore)
+ throws IOException {
+ if (region.getCoprocessorHost() == null) {
+ return false;
+ } else {
+ return region.getCoprocessorHost().postScannerNext(s, results, limit, hasMore);
+ }
+ }
+ }
+
+ private static class SecureWriter implements Closeable {
+ private final PrivilegedWriter privilegedWriter;
+
+ SecureWriter(final Configuration conf, final UserProvider userProvider, final Token userToken,
+ final List<SequenceFile.Writer.Option> opts) throws IOException {
+ privilegedWriter = new PrivilegedWriter(getActiveUser(userProvider, userToken),
+ SequenceFile.createWriter(conf, opts.toArray(new SequenceFile.Writer.Option[opts.size()])));
+ }
+
+ void append(final Object key, final Object value) throws IOException {
+ privilegedWriter.append(key, value);
+ }
+
+ private static User getActiveUser(final UserProvider userProvider, final Token userToken) throws IOException {
+ User user = RpcServer.getRequestUser();
+ if (user == null) {
+ user = userProvider.getCurrent();
+ }
+ if (user == null && userToken != null) {
+ LOG.warn("No found of user credentials, but a token was got from user request");
+ } else if (user != null && userToken != null) {
+ user.addToken(userToken);
+ }
+ return user;
+ }
+
+ @Override
+ public void close() throws IOException {
+ privilegedWriter.close();
+ }
+ }
+
+ private static class PrivilegedWriter implements PrivilegedExceptionAction<Boolean>, Closeable {
+ private final User user;
+ private final SequenceFile.Writer out;
+ private Object key;
+ private Object value;
+
+ PrivilegedWriter(final User user, final SequenceFile.Writer out) {
+ this.user = user;
+ this.out = out;
+ }
+
+ void append(final Object key, final Object value) throws IOException {
+ if (user == null) {
+ out.append(key, value);
+ } else {
+ this.key = key;
+ this.value = value;
+ try {
+ user.runAs(this);
+ } catch (InterruptedException ex) {
+ throw new IOException(ex);
+ }
+ }
+ }
+
+ @Override
+ public Boolean run() throws Exception {
+ out.append(key, value);
+ return true;
+ }
+
+ @Override
+ public void close() throws IOException {
+ out.close();
+ }
+ }
+
+ public static class Response {
+
+ private final long rowCount;
+ private final long cellCount;
+
+ private Response(ExportProtos.ExportResponse r) {
+ this.rowCount = r.getRowCount();
+ this.cellCount = r.getCellCount();
+ }
+
+ public long getRowCount() {
+ return rowCount;
+ }
+
+ public long getCellCount() {
+ return cellCount;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder(35);
+ return builder.append("rowCount=")
+ .append(rowCount)
+ .append(", cellCount=")
+ .append(cellCount)
+ .toString();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/74659730/hbase-endpoint/src/main/protobuf/Export.proto
----------------------------------------------------------------------
diff --git a/hbase-endpoint/src/main/protobuf/Export.proto b/hbase-endpoint/src/main/protobuf/Export.proto
new file mode 100644
index 0000000..5e6c262
--- /dev/null
+++ b/hbase-endpoint/src/main/protobuf/Export.proto
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package hbase.pb;
+
+option java_package = "org.apache.hadoop.hbase.protobuf.generated";
+option java_outer_classname = "ExportProtos";
+option java_generate_equals_and_hash = true;
+option optimize_for = SPEED;
+option java_generic_services = true;
+
+import "Client.proto";
+
+service ExportService {
+ rpc export (ExportRequest) returns (ExportResponse);
+}
+
+message ExportRequest {
+ required Scan scan = 1;
+ required string outputPath = 2;
+ optional bool compressed = 3 [default = false];
+ optional string compressType = 4;
+ optional string compressCodec = 5;
+ optional DelegationToken fsToken = 6;
+}
+message ExportResponse {
+ required uint64 rowCount = 1;
+ required uint64 cellCount = 2;
+}
+
http://git-wip-us.apache.org/repos/asf/hbase/blob/74659730/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestImportExport.java
----------------------------------------------------------------------
diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestImportExport.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestImportExport.java
new file mode 100644
index 0000000..e0d4fd2
--- /dev/null
+++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestImportExport.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.coprocessor;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({MediumTests.class})
+public class TestImportExport extends org.apache.hadoop.hbase.mapreduce.TestImportExport {
+
+ @BeforeClass
+ public static void beforeClass() throws Throwable {
+ UTIL.getConfiguration().setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
+ org.apache.hadoop.hbase.coprocessor.Export.class.getName());
+ org.apache.hadoop.hbase.mapreduce.TestImportExport.beforeClass();
+ }
+
+ @Override
+ protected boolean runExport(String[] args) throws Throwable {
+ Export.run(new Configuration(UTIL.getConfiguration()), args);
+ return true;
+ }
+
+ @Override
+ protected void runExportMain(String[] args) throws Throwable {
+ Export.main(args);
+ }
+
+ /**
+ * Skip the test which is unrelated to the coprocessor.Export.
+ */
+ @Test
+ @Ignore
+ public void testImport94Table() throws Throwable {
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/74659730/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestSecureExport.java
----------------------------------------------------------------------
diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestSecureExport.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestSecureExport.java
new file mode 100644
index 0000000..66d99dd
--- /dev/null
+++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestSecureExport.java
@@ -0,0 +1,439 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.coprocessor;
+
+import com.google.protobuf.ServiceException;
+import java.io.File;
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.http.ssl.KeyStoreTestUtil;
+import org.apache.hadoop.hbase.mapreduce.ExportUtils;
+import org.apache.hadoop.hbase.mapreduce.HadoopSecurityEnabledUserProviderForTesting;
+import org.apache.hadoop.hbase.mapreduce.Import;
+import org.apache.hadoop.hbase.protobuf.generated.VisibilityLabelsProtos;
+import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.security.access.AccessControlConstants;
+import org.apache.hadoop.hbase.security.access.AccessControlLists;
+import org.apache.hadoop.hbase.security.access.SecureTestUtil;
+import org.apache.hadoop.hbase.security.access.SecureTestUtil.AccessTestAction;
+import org.apache.hadoop.hbase.security.access.Permission;
+import org.apache.hadoop.hbase.security.visibility.Authorizations;
+import org.apache.hadoop.hbase.security.visibility.CellVisibility;
+import org.apache.hadoop.hbase.security.visibility.VisibilityClient;
+import org.apache.hadoop.hbase.security.visibility.VisibilityConstants;
+import org.apache.hadoop.hbase.security.visibility.VisibilityTestUtil;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.http.HttpConfig;
+import org.apache.hadoop.minikdc.MiniKdc;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.junit.After;
+import org.junit.AfterClass;
+import static org.junit.Assert.assertEquals;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+@Category({MediumTests.class})
+public class TestSecureExport {
+ private static final Log LOG = LogFactory.getLog(TestSecureExport.class);
+ private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+ private static MiniKdc KDC;
+ private static final File KEYTAB_FILE = new File(UTIL.getDataTestDir("keytab").toUri().getPath());
+ private static String USERNAME;
+ private static String SERVER_PRINCIPAL;
+ private static String HTTP_PRINCIPAL;
+ private static final String FAMILYA_STRING = "fma";
+ private static final String FAMILYB_STRING = "fma";
+ private static final byte[] FAMILYA = Bytes.toBytes(FAMILYA_STRING);
+ private static final byte[] FAMILYB = Bytes.toBytes(FAMILYB_STRING);
+ private static final byte[] ROW1 = Bytes.toBytes("row1");
+ private static final byte[] ROW2 = Bytes.toBytes("row2");
+ private static final byte[] ROW3 = Bytes.toBytes("row3");
+ private static final byte[] QUAL = Bytes.toBytes("qual");
+ private static final String LOCALHOST = "localhost";
+ private static final long NOW = System.currentTimeMillis();
+ // user granted with all global permission
+ private static final String USER_ADMIN = "admin";
+ // user is table owner. will have all permissions on table
+ private static final String USER_OWNER = "owner";
+ // user with rx permissions.
+ private static final String USER_RX = "rxuser";
+ // user with exe-only permissions.
+ private static final String USER_XO = "xouser";
+ // user with read-only permissions.
+ private static final String USER_RO = "rouser";
+ // user with no permissions
+ private static final String USER_NONE = "noneuser";
+ private static final String PRIVATE = "private";
+ private static final String CONFIDENTIAL = "confidential";
+ private static final String SECRET = "secret";
+ private static final String TOPSECRET = "topsecret";
+ @Rule
+ public final TestName name = new TestName();
+ private static void setUpKdcServer() throws Exception {
+ Properties conf = MiniKdc.createConf();
+ conf.put(MiniKdc.DEBUG, true);
+ File kdcFile = new File(UTIL.getDataTestDir("kdc").toUri().getPath());
+ KDC = new MiniKdc(conf, kdcFile);
+ KDC.start();
+ USERNAME = UserGroupInformation.getLoginUser().getShortUserName();
+ SERVER_PRINCIPAL = USERNAME + "/" + LOCALHOST;
+ HTTP_PRINCIPAL = "HTTP/" + LOCALHOST;
+ KDC.createPrincipal(KEYTAB_FILE,
+ SERVER_PRINCIPAL,
+ HTTP_PRINCIPAL,
+ USER_ADMIN + "/" + LOCALHOST,
+ USER_OWNER + "/" + LOCALHOST,
+ USER_RX + "/" + LOCALHOST,
+ USER_RO + "/" + LOCALHOST,
+ USER_XO + "/" + LOCALHOST,
+ USER_NONE + "/" + LOCALHOST);
+ }
+ private static User getUserByLogin(final String user) throws IOException {
+ return User.create(UserGroupInformation.loginUserFromKeytabAndReturnUGI(getPrinciple(user), KEYTAB_FILE.getAbsolutePath()));
+ }
+ private static String getPrinciple(final String user) {
+ return user + "/" + LOCALHOST + "@" + KDC.getRealm();
+ }
+ private static void setUpClusterKdc() throws Exception {
+ HBaseKerberosUtils.setKeytabFileForTesting(KEYTAB_FILE.getAbsolutePath());
+ HBaseKerberosUtils.setPrincipalForTesting(SERVER_PRINCIPAL + "@" + KDC.getRealm());
+ HBaseKerberosUtils.setSecuredConfiguration(UTIL.getConfiguration());
+ // if we drop support for hadoop-2.4.0 and hadoop-2.4.1,
+ // the following key should be changed.
+ // 1) DFS_NAMENODE_USER_NAME_KEY -> DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY
+ // 2) DFS_DATANODE_USER_NAME_KEY -> DFS_DATANODE_KERBEROS_PRINCIPAL_KEY
+ UTIL.getConfiguration().set(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY, SERVER_PRINCIPAL + "@" + KDC.getRealm());
+ UTIL.getConfiguration().set(DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY, SERVER_PRINCIPAL + "@" + KDC.getRealm());
+ UTIL.getConfiguration().set(DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY, KEYTAB_FILE.getAbsolutePath());
+ UTIL.getConfiguration().set(DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY, KEYTAB_FILE.getAbsolutePath());
+ // set yarn principal
+ UTIL.getConfiguration().set(YarnConfiguration.RM_PRINCIPAL, SERVER_PRINCIPAL + "@" + KDC.getRealm());
+ UTIL.getConfiguration().set(YarnConfiguration.NM_PRINCIPAL, SERVER_PRINCIPAL + "@" + KDC.getRealm());
+ UTIL.getConfiguration().set(DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY, HTTP_PRINCIPAL + "@" + KDC.getRealm());
+ UTIL.getConfiguration().setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
+ UTIL.getConfiguration().set(DFSConfigKeys.DFS_HTTP_POLICY_KEY, HttpConfig.Policy.HTTPS_ONLY.name());
+ UTIL.getConfiguration().set(DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY, LOCALHOST + ":0");
+ UTIL.getConfiguration().set(DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY, LOCALHOST + ":0");
+
+ File keystoresDir = new File(UTIL.getDataTestDir("keystore").toUri().getPath());
+ keystoresDir.mkdirs();
+ String sslConfDir = KeyStoreTestUtil.getClasspathDir(TestSecureExport.class);
+ KeyStoreTestUtil.setupSSLConfig(keystoresDir.getAbsolutePath(), sslConfDir, UTIL.getConfiguration(), false);
+
+ UTIL.getConfiguration().setBoolean("ignore.secure.ports.for.testing", true);
+ UserGroupInformation.setConfiguration(UTIL.getConfiguration());
+ UTIL.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, UTIL.getConfiguration().get(
+ CoprocessorHost.REGION_COPROCESSOR_CONF_KEY) + "," + Export.class.getName());
+ }
+ private static void addLabels(final Configuration conf, final List<String> users, final List<String> labels) throws Exception {
+ PrivilegedExceptionAction<VisibilityLabelsProtos.VisibilityLabelsResponse> action
+ = () -> {
+ try (Connection conn = ConnectionFactory.createConnection(conf)) {
+ VisibilityClient.addLabels(conn, labels.toArray(new String[labels.size()]));
+ for (String user : users) {
+ VisibilityClient.setAuths(conn, labels.toArray(new String[labels.size()]), user);
+ }
+ } catch (Throwable t) {
+ throw new IOException(t);
+ }
+ return null;
+ };
+ getUserByLogin(USER_ADMIN).runAs(action);
+ }
+
+ @Before
+ public void announce() {
+ LOG.info("Running " + name.getMethodName());
+ }
+
+ @After
+ public void cleanup() throws IOException {
+ }
+ private static void clearOutput(Path path) throws IOException {
+ FileSystem fs = path.getFileSystem(UTIL.getConfiguration());
+ if (fs.exists(path)) {
+ assertEquals(true, fs.delete(path, true));
+ }
+ }
+ /**
+ * Sets the security firstly for getting the correct default realm.
+ * @throws Exception
+ */
+ @BeforeClass
+ public static void beforeClass() throws Exception {
+ UserProvider.setUserProviderForTesting(UTIL.getConfiguration(), HadoopSecurityEnabledUserProviderForTesting.class);
+ setUpKdcServer();
+ SecureTestUtil.enableSecurity(UTIL.getConfiguration());
+ UTIL.getConfiguration().setBoolean(AccessControlConstants.EXEC_PERMISSION_CHECKS_KEY, true);
+ VisibilityTestUtil.enableVisiblityLabels(UTIL.getConfiguration());
+ SecureTestUtil.verifyConfiguration(UTIL.getConfiguration());
+ setUpClusterKdc();
+ UTIL.startMiniCluster();
+ UTIL.waitUntilAllRegionsAssigned(AccessControlLists.ACL_TABLE_NAME);
+ UTIL.waitUntilAllRegionsAssigned(VisibilityConstants.LABELS_TABLE_NAME);
+ UTIL.waitTableEnabled(AccessControlLists.ACL_TABLE_NAME, 50000);
+ UTIL.waitTableEnabled(VisibilityConstants.LABELS_TABLE_NAME, 50000);
+ SecureTestUtil.grantGlobal(UTIL, USER_ADMIN,
+ Permission.Action.ADMIN,
+ Permission.Action.CREATE,
+ Permission.Action.EXEC,
+ Permission.Action.READ,
+ Permission.Action.WRITE);
+ addLabels(UTIL.getConfiguration(), Arrays.asList(USER_OWNER),
+ Arrays.asList(PRIVATE, CONFIDENTIAL, SECRET, TOPSECRET));
+ }
+
+ @AfterClass
+ public static void afterClass() throws Exception {
+ if (KDC != null) {
+ KDC.stop();
+ }
+ UTIL.shutdownMiniCluster();
+ }
+
+ /**
+ * Test the ExportEndpoint's access levels. The {@link Export} test is ignored
+ * since the access exceptions cannot be collected from the mappers.
+ *
+ * @throws java.io.IOException
+ */
+ @Test
+ public void testAccessCase() throws IOException, Throwable {
+ final String exportTable = name.getMethodName();
+ TableDescriptor exportHtd = TableDescriptorBuilder
+ .newBuilder(TableName.valueOf(name.getMethodName()))
+ .addColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILYA))
+ .setOwnerString(USER_OWNER)
+ .build();
+ SecureTestUtil.createTable(UTIL, exportHtd, new byte[][]{Bytes.toBytes("s")});
+ SecureTestUtil.grantOnTable(UTIL, USER_RO,
+ TableName.valueOf(exportTable), null, null,
+ Permission.Action.READ);
+ SecureTestUtil.grantOnTable(UTIL, USER_RX,
+ TableName.valueOf(exportTable), null, null,
+ Permission.Action.READ,
+ Permission.Action.EXEC);
+ SecureTestUtil.grantOnTable(UTIL, USER_XO,
+ TableName.valueOf(exportTable), null, null,
+ Permission.Action.EXEC);
+ assertEquals(4, AccessControlLists.getTablePermissions(UTIL.getConfiguration(),
+ TableName.valueOf(exportTable)).size());
+ AccessTestAction putAction = () -> {
+ Put p = new Put(ROW1);
+ p.addColumn(FAMILYA, Bytes.toBytes("qual_0"), NOW, QUAL);
+ p.addColumn(FAMILYA, Bytes.toBytes("qual_1"), NOW, QUAL);
+ try (Connection conn = ConnectionFactory.createConnection(UTIL.getConfiguration());
+ Table t = conn.getTable(TableName.valueOf(exportTable))) {
+ t.put(p);
+ }
+ return null;
+ };
+ // no hdfs access.
+ SecureTestUtil.verifyAllowed(putAction,
+ getUserByLogin(USER_ADMIN),
+ getUserByLogin(USER_OWNER));
+ SecureTestUtil.verifyDenied(putAction,
+ getUserByLogin(USER_RO),
+ getUserByLogin(USER_XO),
+ getUserByLogin(USER_RX),
+ getUserByLogin(USER_NONE));
+
+ final FileSystem fs = UTIL.getDFSCluster().getFileSystem();
+ final Path openDir = fs.makeQualified(new Path("testAccessCase"));
+ fs.mkdirs(openDir);
+ fs.setPermission(openDir, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
+ final Path output = fs.makeQualified(new Path(openDir, "output"));
+ AccessTestAction exportAction = () -> {
+ try {
+ String[] args = new String[]{exportTable, output.toString()};
+ Map<byte[], Export.Response> result
+ = Export.run(new Configuration(UTIL.getConfiguration()), args);
+ long rowCount = 0;
+ long cellCount = 0;
+ for (Export.Response r : result.values()) {
+ rowCount += r.getRowCount();
+ cellCount += r.getCellCount();
+ }
+ assertEquals(1, rowCount);
+ assertEquals(2, cellCount);
+ return null;
+ } catch (ServiceException | IOException ex) {
+ throw ex;
+ } catch (Throwable ex) {
+ LOG.error(ex);
+ throw new Exception(ex);
+ } finally {
+ clearOutput(output);
+ }
+ };
+ SecureTestUtil.verifyDenied(exportAction,
+ getUserByLogin(USER_RO),
+ getUserByLogin(USER_XO),
+ getUserByLogin(USER_NONE));
+ SecureTestUtil.verifyAllowed(exportAction,
+ getUserByLogin(USER_ADMIN),
+ getUserByLogin(USER_OWNER),
+ getUserByLogin(USER_RX));
+ AccessTestAction deleteAction = () -> {
+ UTIL.deleteTable(TableName.valueOf(exportTable));
+ return null;
+ };
+ SecureTestUtil.verifyAllowed(deleteAction, getUserByLogin(USER_OWNER));
+ fs.delete(openDir, true);
+ }
+ @Test
+ public void testVisibilityLabels() throws IOException, Throwable {
+ final String exportTable = name.getMethodName() + "_export";
+ final String importTable = name.getMethodName() + "_import";
+ final TableDescriptor exportHtd = TableDescriptorBuilder.newBuilder(TableName.valueOf(exportTable))
+ .addColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILYA))
+ .setOwnerString(USER_OWNER)
+ .build();
+ SecureTestUtil.createTable(UTIL, exportHtd, new byte[][]{Bytes.toBytes("s")});
+ AccessTestAction putAction = () -> {
+ Put p1 = new Put(ROW1);
+ p1.addColumn(FAMILYA, QUAL, NOW, QUAL);
+ p1.setCellVisibility(new CellVisibility(SECRET));
+ Put p2 = new Put(ROW2);
+ p2.addColumn(FAMILYA, QUAL, NOW, QUAL);
+ p2.setCellVisibility(new CellVisibility(PRIVATE + " & " + CONFIDENTIAL));
+ Put p3 = new Put(ROW3);
+ p3.addColumn(FAMILYA, QUAL, NOW, QUAL);
+ p3.setCellVisibility(new CellVisibility("!" + CONFIDENTIAL + " & " + TOPSECRET));
+ try (Connection conn = ConnectionFactory.createConnection(UTIL.getConfiguration());
+ Table t = conn.getTable(TableName.valueOf(exportTable))) {
+ t.put(p1);
+ t.put(p2);
+ t.put(p3);
+ }
+ return null;
+ };
+ SecureTestUtil.verifyAllowed(putAction, getUserByLogin(USER_OWNER));
+ List<Pair<List<String>, Integer>> labelsAndRowCounts = new LinkedList<>();
+ labelsAndRowCounts.add(new Pair<>(Arrays.asList(SECRET), 1));
+ labelsAndRowCounts.add(new Pair<>(Arrays.asList(PRIVATE, CONFIDENTIAL), 1));
+ labelsAndRowCounts.add(new Pair<>(Arrays.asList(TOPSECRET), 1));
+ labelsAndRowCounts.add(new Pair<>(Arrays.asList(TOPSECRET, CONFIDENTIAL), 0));
+ labelsAndRowCounts.add(new Pair<>(Arrays.asList(TOPSECRET, CONFIDENTIAL, PRIVATE, SECRET), 2));
+ for (final Pair<List<String>, Integer> labelsAndRowCount : labelsAndRowCounts) {
+ final List<String> labels = labelsAndRowCount.getFirst();
+ final int rowCount = labelsAndRowCount.getSecond();
+ //create a open permission directory.
+ final Path openDir = new Path("testAccessCase");
+ final FileSystem fs = openDir.getFileSystem(UTIL.getConfiguration());
+ fs.mkdirs(openDir);
+ fs.setPermission(openDir, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
+ final Path output = fs.makeQualified(new Path(openDir, "output"));
+ AccessTestAction exportAction = () -> {
+ StringBuilder buf = new StringBuilder();
+ labels.forEach(v -> buf.append(v).append(","));
+ buf.deleteCharAt(buf.length() - 1);
+ try {
+ String[] args = new String[]{
+ "-D " + ExportUtils.EXPORT_VISIBILITY_LABELS + "=" + buf.toString(),
+ exportTable,
+ output.toString(),};
+ Export.run(new Configuration(UTIL.getConfiguration()), args);
+ return null;
+ } catch (ServiceException | IOException ex) {
+ throw ex;
+ } catch (Throwable ex) {
+ throw new Exception(ex);
+ }
+ };
+ SecureTestUtil.verifyAllowed(exportAction, getUserByLogin(USER_OWNER));
+ final TableDescriptor importHtd = TableDescriptorBuilder.newBuilder(TableName.valueOf(importTable))
+ .addColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILYB))
+ .setOwnerString(USER_OWNER)
+ .build();
+ SecureTestUtil.createTable(UTIL, importHtd, new byte[][]{Bytes.toBytes("s")});
+ AccessTestAction importAction = () -> {
+ String[] args = new String[]{
+ "-D" + Import.CF_RENAME_PROP + "=" + FAMILYA_STRING + ":" + FAMILYB_STRING,
+ importTable,
+ output.toString()
+ };
+ assertEquals(0, ToolRunner.run(new Configuration(UTIL.getConfiguration()), new Import(), args));
+ return null;
+ };
+ SecureTestUtil.verifyAllowed(importAction, getUserByLogin(USER_OWNER));
+ AccessTestAction scanAction = () -> {
+ Scan scan = new Scan();
+ scan.setAuthorizations(new Authorizations(labels));
+ try (Connection conn = ConnectionFactory.createConnection(UTIL.getConfiguration());
+ Table table = conn.getTable(importHtd.getTableName());
+ ResultScanner scanner = table.getScanner(scan)) {
+ int count = 0;
+ for (Result r : scanner) {
+ ++count;
+ }
+ assertEquals(rowCount, count);
+ }
+ return null;
+ };
+ SecureTestUtil.verifyAllowed(scanAction, getUserByLogin(USER_OWNER));
+ AccessTestAction deleteAction = () -> {
+ UTIL.deleteTable(importHtd.getTableName());
+ return null;
+ };
+ SecureTestUtil.verifyAllowed(deleteAction, getUserByLogin(USER_OWNER));
+ clearOutput(output);
+ }
+ AccessTestAction deleteAction = () -> {
+ UTIL.deleteTable(exportHtd.getTableName());
+ return null;
+ };
+ SecureTestUtil.verifyAllowed(deleteAction, getUserByLogin(USER_OWNER));
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/74659730/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/ExportEndpointExample.java
----------------------------------------------------------------------
diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/ExportEndpointExample.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/ExportEndpointExample.java
new file mode 100644
index 0000000..cc06844
--- /dev/null
+++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/ExportEndpointExample.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.example;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.coprocessor.Export;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A simple example on how to use {@link org.apache.hadoop.hbase.coprocessor.Export}.
+ *
+ * <p>
+ * For the protocol buffer definition of the ExportService, see the source file located under
+ * hbase-endpoint/src/main/protobuf/Export.proto.
+ * </p>
+ */
+public class ExportEndpointExample {
+
+ public static void main(String[] args) throws Throwable {
+ int rowCount = 100;
+ byte[] family = Bytes.toBytes("family");
+ Configuration conf = HBaseConfiguration.create();
+ TableName tableName = TableName.valueOf("ExportEndpointExample");
+ try (Connection con = ConnectionFactory.createConnection(conf);
+ Admin admin = con.getAdmin()) {
+ TableDescriptor desc = TableDescriptorBuilder.newBuilder(tableName)
+ // MUST mount the export endpoint
+ .addCoprocessor(Export.class.getName())
+ .addColumnFamily(ColumnFamilyDescriptorBuilder.of(family))
+ .build();
+ admin.createTable(desc);
+
+ List<Put> puts = new ArrayList<>(rowCount);
+ for (int row = 0; row != rowCount; ++row) {
+ byte[] bs = Bytes.toBytes(row);
+ Put put = new Put(bs);
+ put.addColumn(family, bs, bs);
+ puts.add(put);
+ }
+ try (Table table = con.getTable(tableName)) {
+ table.put(puts);
+ }
+
+ Path output = new Path("/tmp/ExportEndpointExample_output");
+ Scan scan = new Scan();
+ Map<byte[], Export.Response> result = Export.run(conf, tableName, scan, output);
+ final long totalOutputRows = result.values().stream().mapToLong(v -> v.getRowCount()).sum();
+ final long totalOutputCells = result.values().stream().mapToLong(v -> v.getCellCount()).sum();
+ System.out.println("table:" + tableName);
+ System.out.println("output:" + output);
+ System.out.println("total rows:" + totalOutputRows);
+ System.out.println("total cells:" + totalOutputCells);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/74659730/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Export.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Export.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Export.java
index de6cf3a..b2dc254 100644
--- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Export.java
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Export.java
@@ -20,23 +20,17 @@ package org.apache.hadoop.hbase.mapreduce;
import java.io.IOException;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
-import org.apache.hadoop.hbase.filter.Filter;
-import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
-import org.apache.hadoop.hbase.filter.PrefixFilter;
-import org.apache.hadoop.hbase.filter.RegexStringComparator;
-import org.apache.hadoop.hbase.filter.RowFilter;
+import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ArrayUtils;
+import org.apache.hadoop.hbase.util.Triple;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
@@ -50,12 +44,8 @@ import org.apache.hadoop.util.ToolRunner;
*/
@InterfaceAudience.Public
public class Export extends Configured implements Tool {
- private static final Log LOG = LogFactory.getLog(Export.class);
- final static String NAME = "export";
- final static String RAW_SCAN = "hbase.mapreduce.include.deleted.rows";
- final static String EXPORT_BATCHING = "hbase.export.scanner.batch";
-
- private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
+ static final String NAME = "export";
+ static final String JOB_NAME_CONF_KEY = "mapreduce.job.name";
/**
* Sets up the actual job.
@@ -67,13 +57,14 @@ public class Export extends Configured implements Tool {
*/
public static Job createSubmittableJob(Configuration conf, String[] args)
throws IOException {
- String tableName = args[0];
- Path outputDir = new Path(args[1]);
+ Triple<TableName, Scan, Path> arguments = ExportUtils.getArgumentsFromCommandLine(conf, args);
+ String tableName = arguments.getFirst().getNameAsString();
+ Path outputDir = arguments.getThird();
Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName));
job.setJobName(NAME + "_" + tableName);
job.setJarByClass(Export.class);
// Set optional scan parameters
- Scan s = getConfiguredScanForJob(conf, args);
+ Scan s = arguments.getSecond();
IdentityTableMapper.initJob(tableName, s, IdentityTableMapper.class, job);
// No reducers. Just write straight to output files.
job.setNumReduceTasks(0);
@@ -84,101 +75,15 @@ public class Export extends Configured implements Tool {
return job;
}
- private static Scan getConfiguredScanForJob(Configuration conf, String[] args) throws IOException {
- Scan s = new Scan();
- // Optional arguments.
- // Set Scan Versions
- int versions = args.length > 2? Integer.parseInt(args[2]): 1;
- s.setMaxVersions(versions);
- // Set Scan Range
- long startTime = args.length > 3? Long.parseLong(args[3]): 0L;
- long endTime = args.length > 4? Long.parseLong(args[4]): Long.MAX_VALUE;
- s.setTimeRange(startTime, endTime);
- // Set cache blocks
- s.setCacheBlocks(false);
- // set Start and Stop row
- if (conf.get(TableInputFormat.SCAN_ROW_START) != null) {
- s.setStartRow(Bytes.toBytesBinary(conf.get(TableInputFormat.SCAN_ROW_START)));
- }
- if (conf.get(TableInputFormat.SCAN_ROW_STOP) != null) {
- s.setStopRow(Bytes.toBytesBinary(conf.get(TableInputFormat.SCAN_ROW_STOP)));
- }
- // Set Scan Column Family
- boolean raw = Boolean.parseBoolean(conf.get(RAW_SCAN));
- if (raw) {
- s.setRaw(raw);
- }
- for (String columnFamily : conf.getTrimmedStrings(TableInputFormat.SCAN_COLUMN_FAMILY)) {
- s.addFamily(Bytes.toBytes(columnFamily));
- }
- // Set RowFilter or Prefix Filter if applicable.
- Filter exportFilter = getExportFilter(args);
- if (exportFilter!= null) {
- LOG.info("Setting Scan Filter for Export.");
- s.setFilter(exportFilter);
- }
-
- int batching = conf.getInt(EXPORT_BATCHING, -1);
- if (batching != -1){
- try {
- s.setBatch(batching);
- } catch (IncompatibleFilterException e) {
- LOG.error("Batching could not be set", e);
- }
- }
- LOG.info("versions=" + versions + ", starttime=" + startTime +
- ", endtime=" + endTime + ", keepDeletedCells=" + raw);
- return s;
- }
-
- private static Filter getExportFilter(String[] args) {
- Filter exportFilter = null;
- String filterCriteria = (args.length > 5) ? args[5]: null;
- if (filterCriteria == null) return null;
- if (filterCriteria.startsWith("^")) {
- String regexPattern = filterCriteria.substring(1, filterCriteria.length());
- exportFilter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator(regexPattern));
- } else {
- exportFilter = new PrefixFilter(Bytes.toBytesBinary(filterCriteria));
- }
- return exportFilter;
- }
-
- /*
- * @param errorMsg Error message. Can be null.
- */
- private static void usage(final String errorMsg) {
- if (errorMsg != null && errorMsg.length() > 0) {
- System.err.println("ERROR: " + errorMsg);
- }
- System.err.println("Usage: Export [-D <property=value>]* <tablename> <outputdir> [<versions> " +
- "[<starttime> [<endtime>]] [^[regex pattern] or [Prefix] to filter]]\n");
- System.err.println(" Note: -D properties will be applied to the conf used. ");
- System.err.println(" For example: ");
- System.err.println(" -D mapreduce.output.fileoutputformat.compress=true");
- System.err.println(" -D mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec");
- System.err.println(" -D mapreduce.output.fileoutputformat.compress.type=BLOCK");
- System.err.println(" Additionally, the following SCAN properties can be specified");
- System.err.println(" to control/limit what is exported..");
- System.err.println(" -D " + TableInputFormat.SCAN_COLUMN_FAMILY + "=<family1>,<family2>, ...");
- System.err.println(" -D " + RAW_SCAN + "=true");
- System.err.println(" -D " + TableInputFormat.SCAN_ROW_START + "=<ROWSTART>");
- System.err.println(" -D " + TableInputFormat.SCAN_ROW_STOP + "=<ROWSTOP>");
- System.err.println(" -D " + JOB_NAME_CONF_KEY
- + "=jobName - use the specified mapreduce job name for the export");
- System.err.println("For performance consider the following properties:\n"
- + " -Dhbase.client.scanner.caching=100\n"
- + " -Dmapreduce.map.speculative=false\n"
- + " -Dmapreduce.reduce.speculative=false");
- System.err.println("For tables with very wide rows consider setting the batch size as below:\n"
- + " -D" + EXPORT_BATCHING + "=10");
- }
-
-
@Override
public int run(String[] args) throws Exception {
- if (args.length < 2) {
- usage("Wrong number of arguments: " + args.length);
+ if (!ExportUtils.isValidArguements(args)) {
+ ExportUtils.usage("Wrong number of arguments: " + ArrayUtils.length(args));
+ System.err.println(" -D " + JOB_NAME_CONF_KEY
+ + "=jobName - use the specified mapreduce job name for the export");
+ System.err.println("For MR performance consider the following properties:");
+ System.err.println(" -D mapreduce.map.speculative=false");
+ System.err.println(" -D mapreduce.reduce.speculative=false");
return -1;
}
Job job = createSubmittableJob(getConf(), args);
http://git-wip-us.apache.org/repos/asf/hbase/blob/74659730/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/ExportUtils.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/ExportUtils.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/ExportUtils.java
new file mode 100644
index 0000000..9cc2a80
--- /dev/null
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/ExportUtils.java
@@ -0,0 +1,175 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.CompareFilter;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
+import org.apache.hadoop.hbase.filter.PrefixFilter;
+import org.apache.hadoop.hbase.filter.RegexStringComparator;
+import org.apache.hadoop.hbase.filter.RowFilter;
+import org.apache.hadoop.hbase.security.visibility.Authorizations;
+import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Triple;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+/**
+ * Some helper methods are used by {@link org.apache.hadoop.hbase.mapreduce.Export}
+ * and org.apache.hadoop.hbase.coprocessor.Export (in hbase-endpooint).
+ */
+@InterfaceAudience.Private
+public final class ExportUtils {
+ private static final Log LOG = LogFactory.getLog(ExportUtils.class);
+ public static final String RAW_SCAN = "hbase.mapreduce.include.deleted.rows";
+ public static final String EXPORT_BATCHING = "hbase.export.scanner.batch";
+ public static final String EXPORT_CACHING = "hbase.export.scanner.caching";
+ public static final String EXPORT_VISIBILITY_LABELS = "hbase.export.visibility.labels";
+ /**
+ * Common usage for other export tools.
+ * @param errorMsg Error message. Can be null.
+ */
+ public static void usage(final String errorMsg) {
+ if (errorMsg != null && errorMsg.length() > 0) {
+ System.err.println("ERROR: " + errorMsg);
+ }
+ System.err.println("Usage: Export [-D <property=value>]* <tablename> <outputdir> [<versions> " +
+ "[<starttime> [<endtime>]] [^[regex pattern] or [Prefix] to filter]]\n");
+ System.err.println(" Note: -D properties will be applied to the conf used. ");
+ System.err.println(" For example: ");
+ System.err.println(" -D " + FileOutputFormat.COMPRESS + "=true");
+ System.err.println(" -D " + FileOutputFormat.COMPRESS_CODEC + "=org.apache.hadoop.io.compress.GzipCodec");
+ System.err.println(" -D " + FileOutputFormat.COMPRESS_TYPE + "=BLOCK");
+ System.err.println(" Additionally, the following SCAN properties can be specified");
+ System.err.println(" to control/limit what is exported..");
+ System.err.println(" -D " + TableInputFormat.SCAN_COLUMN_FAMILY + "=<family1>,<family2>, ...");
+ System.err.println(" -D " + RAW_SCAN + "=true");
+ System.err.println(" -D " + TableInputFormat.SCAN_ROW_START + "=<ROWSTART>");
+ System.err.println(" -D " + TableInputFormat.SCAN_ROW_STOP + "=<ROWSTOP>");
+ System.err.println(" -D " + HConstants.HBASE_CLIENT_SCANNER_CACHING + "=100");
+ System.err.println(" -D " + EXPORT_VISIBILITY_LABELS + "=<labels>");
+ System.err.println("For tables with very wide rows consider setting the batch size as below:\n"
+ + " -D " + EXPORT_BATCHING + "=10\n"
+ + " -D " + EXPORT_CACHING + "=100");
+ }
+
+ private static Filter getExportFilter(String[] args) {
+ Filter exportFilter;
+ String filterCriteria = (args.length > 5) ? args[5]: null;
+ if (filterCriteria == null) return null;
+ if (filterCriteria.startsWith("^")) {
+ String regexPattern = filterCriteria.substring(1, filterCriteria.length());
+ exportFilter = new RowFilter(CompareFilter.CompareOp.EQUAL, new RegexStringComparator(regexPattern));
+ } else {
+ exportFilter = new PrefixFilter(Bytes.toBytesBinary(filterCriteria));
+ }
+ return exportFilter;
+ }
+
+ public static boolean isValidArguements(String[] args) {
+ return args != null && args.length >= 2;
+ }
+
+ public static Triple<TableName, Scan, Path> getArgumentsFromCommandLine(
+ Configuration conf, String[] args) throws IOException {
+ if (!isValidArguements(args)) {
+ return null;
+ }
+ return new Triple<>(TableName.valueOf(args[0]), getScanFromCommandLine(conf, args), new Path(args[1]));
+ }
+
+ @VisibleForTesting
+ static Scan getScanFromCommandLine(Configuration conf, String[] args) throws IOException {
+ Scan s = new Scan();
+ // Optional arguments.
+ // Set Scan Versions
+ int versions = args.length > 2? Integer.parseInt(args[2]): 1;
+ s.setMaxVersions(versions);
+ // Set Scan Range
+ long startTime = args.length > 3? Long.parseLong(args[3]): 0L;
+ long endTime = args.length > 4? Long.parseLong(args[4]): Long.MAX_VALUE;
+ s.setTimeRange(startTime, endTime);
+ // Set cache blocks
+ s.setCacheBlocks(false);
+ // set Start and Stop row
+ if (conf.get(TableInputFormat.SCAN_ROW_START) != null) {
+ s.setStartRow(Bytes.toBytesBinary(conf.get(TableInputFormat.SCAN_ROW_START)));
+ }
+ if (conf.get(TableInputFormat.SCAN_ROW_STOP) != null) {
+ s.setStopRow(Bytes.toBytesBinary(conf.get(TableInputFormat.SCAN_ROW_STOP)));
+ }
+ // Set Scan Column Family
+ boolean raw = Boolean.parseBoolean(conf.get(RAW_SCAN));
+ if (raw) {
+ s.setRaw(raw);
+ }
+ for (String columnFamily : conf.getTrimmedStrings(TableInputFormat.SCAN_COLUMN_FAMILY)) {
+ s.addFamily(Bytes.toBytes(columnFamily));
+ }
+ // Set RowFilter or Prefix Filter if applicable.
+ Filter exportFilter = getExportFilter(args);
+ if (exportFilter!= null) {
+ LOG.info("Setting Scan Filter for Export.");
+ s.setFilter(exportFilter);
+ }
+ List<String> labels = null;
+ if (conf.get(EXPORT_VISIBILITY_LABELS) != null) {
+ labels = Arrays.asList(conf.getStrings(EXPORT_VISIBILITY_LABELS));
+ if (!labels.isEmpty()) {
+ s.setAuthorizations(new Authorizations(labels));
+ }
+ }
+
+ int batching = conf.getInt(EXPORT_BATCHING, -1);
+ if (batching != -1) {
+ try {
+ s.setBatch(batching);
+ } catch (IncompatibleFilterException e) {
+ LOG.error("Batching could not be set", e);
+ }
+ }
+
+ int caching = conf.getInt(EXPORT_CACHING, 100);
+ if (caching != -1) {
+ try {
+ s.setCaching(caching);
+ } catch (IncompatibleFilterException e) {
+ LOG.error("Caching could not be set", e);
+ }
+ }
+ LOG.info("versions=" + versions + ", starttime=" + startTime
+ + ", endtime=" + endTime + ", keepDeletedCells=" + raw
+ + ", visibility labels=" + labels);
+ return s;
+ }
+
+ private ExportUtils() {
+ }
+}