You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@accumulo.apache.org by GitBox <gi...@apache.org> on 2018/06/07 23:55:42 UTC

[GitHub] ctubbsii closed pull request #523: Fix Auditing for new Bulk Import / remove warnings

ctubbsii closed pull request #523: Fix Auditing for new Bulk Import / remove warnings
URL: https://github.com/apache/accumulo/pull/523
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/Bulk.java b/core/src/main/java/org/apache/accumulo/core/client/impl/Bulk.java
index 4f09689246..dcea41c913 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/Bulk.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/Bulk.java
@@ -16,7 +16,6 @@
  */
 package org.apache.accumulo.core.client.impl;
 
-import java.io.Serializable;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -97,7 +96,7 @@ public String toString() {
   /**
    * WARNING : do not change this class, its used for serialization to Json
    */
-  public static class FileInfo implements Serializable {
+  public static class FileInfo {
     final String name;
     final long estSize;
     final long estEntries;
@@ -151,7 +150,7 @@ public int hashCode() {
     }
   }
 
-  public static class Files implements Iterable<FileInfo>, Serializable {
+  public static class Files implements Iterable<FileInfo> {
     Map<String,FileInfo> files = new HashMap<>();
 
     public Files(Collection<FileInfo> files) {
diff --git a/proxy/src/main/java/org/apache/accumulo/proxy/ProxyServer.java b/proxy/src/main/java/org/apache/accumulo/proxy/ProxyServer.java
index 61f10b91c3..a6473a9f40 100644
--- a/proxy/src/main/java/org/apache/accumulo/proxy/ProxyServer.java
+++ b/proxy/src/main/java/org/apache/accumulo/proxy/ProxyServer.java
@@ -61,6 +61,7 @@
 import org.apache.accumulo.core.client.admin.ActiveScan;
 import org.apache.accumulo.core.client.admin.CompactionConfig;
 import org.apache.accumulo.core.client.admin.NewTableConfiguration;
+import org.apache.accumulo.core.client.admin.TableOperations.ImportExecutorOptions;
 import org.apache.accumulo.core.client.admin.TimeType;
 import org.apache.accumulo.core.client.impl.ClientConfConverter;
 import org.apache.accumulo.core.client.impl.Credentials;
@@ -1767,8 +1768,13 @@ public void importDirectory(ByteBuffer login, String tableName, String importDir
       org.apache.accumulo.proxy.thrift.AccumuloException,
       org.apache.accumulo.proxy.thrift.AccumuloSecurityException, TException {
     try {
-      getConnector(login).tableOperations().importDirectory(tableName, importDir, failureDir,
-          setTime);
+      ImportExecutorOptions loader = getConnector(login).tableOperations().addFilesTo(tableName)
+          .from(importDir);
+      if (setTime) {
+        loader.settingLogicalTime().load();
+      } else {
+        loader.load();
+      }
     } catch (Exception e) {
       handleExceptionTNF(e);
     }
diff --git a/server/base/src/main/java/org/apache/accumulo/server/client/deprecated/BulkImporter.java b/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
similarity index 99%
rename from server/base/src/main/java/org/apache/accumulo/server/client/deprecated/BulkImporter.java
rename to server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
index b5d2a06f41..80df178b99 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/client/deprecated/BulkImporter.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.accumulo.server.client.deprecated;
+package org.apache.accumulo.server.client;
 
 import static org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly;
 
@@ -75,12 +75,10 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-@Deprecated
 public class BulkImporter {
 
   private static final Logger log = LoggerFactory.getLogger(BulkImporter.class);
 
-  @Deprecated
   public static List<String> bulkLoad(ClientContext context, long tid, String tableId,
       List<String> files, String errorDir, boolean setTime) throws IOException, AccumuloException,
       AccumuloSecurityException, ThriftTableOperationException {
diff --git a/server/base/src/main/java/org/apache/accumulo/server/client/ClientServiceHandler.java b/server/base/src/main/java/org/apache/accumulo/server/client/ClientServiceHandler.java
index 55ae999de9..d63e5fc2f3 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/client/ClientServiceHandler.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/client/ClientServiceHandler.java
@@ -60,7 +60,6 @@
 import org.apache.accumulo.core.security.thrift.TCredentials;
 import org.apache.accumulo.core.trace.thrift.TInfo;
 import org.apache.accumulo.server.AccumuloServerContext;
-import org.apache.accumulo.server.client.deprecated.BulkImporter;
 import org.apache.accumulo.server.conf.ServerConfigurationFactory;
 import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.security.AuditedSecurityOperation;
diff --git a/server/base/src/main/java/org/apache/accumulo/server/security/AuditedSecurityOperation.java b/server/base/src/main/java/org/apache/accumulo/server/security/AuditedSecurityOperation.java
index 15c2b96ee0..44ab47eddf 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/security/AuditedSecurityOperation.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/security/AuditedSecurityOperation.java
@@ -419,7 +419,7 @@ public boolean canDeleteRange(TCredentials c, Table.ID tableId, String tableName
   public boolean canBulkImport(TCredentials c, Table.ID tableId, String tableName, String dir,
       String failDir, Namespace.ID namespaceId) throws ThriftSecurityException {
     try {
-      boolean result = super.canBulkImport(c, tableId, namespaceId);
+      boolean result = super.canBulkImport(c, tableId, tableName, dir, failDir, namespaceId);
       audit(c, result, CAN_BULK_IMPORT_AUDIT_TEMPLATE, tableName, dir, failDir);
       return result;
     } catch (ThriftSecurityException ex) {
diff --git a/server/base/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java b/server/base/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java
index b1692943cd..ae9d0cf9f5 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java
@@ -578,11 +578,6 @@ public boolean canDeleteRange(TCredentials c, Table.ID tableId, String tableName
 
   public boolean canBulkImport(TCredentials c, Table.ID tableId, String tableName, String dir,
       String failDir, Namespace.ID namespaceId) throws ThriftSecurityException {
-    return canBulkImport(c, tableId, namespaceId);
-  }
-
-  public boolean canBulkImport(TCredentials c, Table.ID tableId, Namespace.ID namespaceId)
-      throws ThriftSecurityException {
     authenticate(c);
     return hasTablePermission(c, tableId, namespaceId, TablePermission.BULK_IMPORT, false);
   }
diff --git a/server/base/src/test/java/org/apache/accumulo/server/client/deprecated/BulkImporterTest.java b/server/base/src/test/java/org/apache/accumulo/server/client/BulkImporterTest.java
similarity index 98%
rename from server/base/src/test/java/org/apache/accumulo/server/client/deprecated/BulkImporterTest.java
rename to server/base/src/test/java/org/apache/accumulo/server/client/BulkImporterTest.java
index 713b883d8b..8aad354b63 100644
--- a/server/base/src/test/java/org/apache/accumulo/server/client/deprecated/BulkImporterTest.java
+++ b/server/base/src/test/java/org/apache/accumulo/server/client/BulkImporterTest.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.accumulo.server.client.deprecated;
+package org.apache.accumulo.server.client;
 
 import java.util.Collection;
 import java.util.Collections;
@@ -40,7 +40,7 @@
 import org.apache.accumulo.core.file.FileOperations;
 import org.apache.accumulo.core.file.FileSKVWriter;
 import org.apache.accumulo.core.util.CachedConfiguration;
-import org.apache.accumulo.server.client.deprecated.BulkImporter;
+import org.apache.accumulo.server.client.BulkImporter;
 import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.fs.VolumeManagerImpl;
 import org.apache.commons.lang.NotImplementedException;
diff --git a/server/master/src/main/java/org/apache/accumulo/master/FateServiceHandler.java b/server/master/src/main/java/org/apache/accumulo/master/FateServiceHandler.java
index d4f04b2678..0baec1fceb 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/FateServiceHandler.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/FateServiceHandler.java
@@ -525,10 +525,16 @@ public String invalidMessage(String argument) {
 
         final boolean canBulkImport;
         try {
-          canBulkImport = master.security.canBulkImport(c, tableId, namespaceId);
+          String tableName = Tables.getTableName(master.getInstance(), tableId);
+          canBulkImport = master.security.canBulkImport(c, tableId, tableName, dir, null,
+              namespaceId);
         } catch (ThriftSecurityException e) {
           throwIfTableMissingSecurityException(e, tableId, "", TableOperation.BULK_IMPORT);
           throw e;
+        } catch (TableNotFoundException e) {
+          throw new ThriftTableOperationException(tableId.canonicalID(), null,
+              TableOperation.BULK_IMPORT, TableOperationExceptionType.NOTFOUND,
+              "Table no longer exists");
         }
 
         if (!canBulkImport)
diff --git a/test/src/main/java/org/apache/accumulo/test/AuditMessageIT.java b/test/src/main/java/org/apache/accumulo/test/AuditMessageIT.java
index 575d55cc6b..b98dda0e82 100644
--- a/test/src/main/java/org/apache/accumulo/test/AuditMessageIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/AuditMessageIT.java
@@ -306,8 +306,10 @@ public void testImportExportOperationsAudits()
 
     // Prepare to export the table
     File exportDir = new File(getCluster().getConfig().getDir() + "/export");
+    File exportDirBulk = new File(getCluster().getConfig().getDir() + "/export_bulk");
+    assertTrue(exportDirBulk.mkdir() || exportDirBulk.isDirectory());
 
-    auditConnector.tableOperations().offline(OLD_TEST_TABLE_NAME);
+    auditConnector.tableOperations().offline(OLD_TEST_TABLE_NAME, true);
     auditConnector.tableOperations().exportTable(OLD_TEST_TABLE_NAME, exportDir.toString());
 
     // We've exported the table metadata to the MiniAccumuloCluster root dir. Grab the .rf file path
@@ -327,14 +329,13 @@ public void testImportExportOperationsAudits()
       }
     }
     FileUtils.copyFileToDirectory(importFile, exportDir);
+    FileUtils.copyFileToDirectory(importFile, exportDirBulk);
     auditConnector.tableOperations().importTable(NEW_TEST_TABLE_NAME, exportDir.toString());
 
     // Now do a Directory (bulk) import of the same data.
     auditConnector.tableOperations().create(THIRD_TEST_TABLE_NAME);
-    File failDir = new File(exportDir + "/tmp");
-    assertTrue(failDir.mkdirs() || failDir.isDirectory());
-    auditConnector.tableOperations().importDirectory(THIRD_TEST_TABLE_NAME, exportDir.toString(),
-        failDir.toString(), false);
+    auditConnector.tableOperations().addFilesTo(THIRD_TEST_TABLE_NAME)
+        .from(exportDirBulk.toString()).load();
     auditConnector.tableOperations().online(OLD_TEST_TABLE_NAME);
 
     // Stop testing activities here
@@ -360,7 +361,7 @@ public void testImportExportOperationsAudits()
     assertEquals(1,
         findAuditMessage(auditMessages,
             String.format(AuditedSecurityOperation.CAN_BULK_IMPORT_AUDIT_TEMPLATE,
-                THIRD_TEST_TABLE_NAME, filePrefix + exportDir, filePrefix + failDir)));
+                THIRD_TEST_TABLE_NAME, filePrefix + exportDirBulk, null)));
     assertEquals(1,
         findAuditMessage(auditMessages,
             String.format(AuditedSecurityOperation.CAN_ONLINE_OFFLINE_TABLE_AUDIT_TEMPLATE,
diff --git a/test/src/main/java/org/apache/accumulo/test/BulkImportSequentialRowsIT.java b/test/src/main/java/org/apache/accumulo/test/BulkImportSequentialRowsIT.java
index 1e65184361..0ad0336399 100644
--- a/test/src/main/java/org/apache/accumulo/test/BulkImportSequentialRowsIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/BulkImportSequentialRowsIT.java
@@ -74,11 +74,8 @@ public void testBulkImportFailure() throws Exception {
     Path bulk = new Path(rootPath, "bulk");
     log.info("bulk: {}", bulk);
     assertTrue(fs.mkdirs(bulk));
-    Path err = new Path(rootPath, "err");
-    log.info("err: {}", err);
 
     assertTrue(fs.mkdirs(bulk));
-    assertTrue(fs.mkdirs(err));
 
     Path rfile = new Path(bulk, "file.rf");
 
@@ -99,7 +96,7 @@ public void testBulkImportFailure() throws Exception {
     // Then import a single rfile to all the tablets, hoping that we get a failure to import because
     // of the balancer moving tablets around
     // and then we get to verify that the bug is actually fixed.
-    to.importDirectory(tableName, bulk.toString(), err.toString(), false);
+    to.addFilesTo(tableName).from(bulk.toString()).load();
 
     // The bug is that some tablets don't get imported into.
     assertEquals(NR * NV,
diff --git a/test/src/main/java/org/apache/accumulo/test/NamespacesIT.java b/test/src/main/java/org/apache/accumulo/test/NamespacesIT.java
index db27cebecc..2f037b4be2 100644
--- a/test/src/main/java/org/apache/accumulo/test/NamespacesIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/NamespacesIT.java
@@ -1245,7 +1245,7 @@ public void verifyTableOperationsExceptions() throws Exception {
             fail();
             break;
           case 18:
-            ops.importDirectory(tableName, "", "", false);
+            ops.addFilesTo(tableName).from("").load();
             fail();
             break;
           case 19:
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BulkFileIT.java b/test/src/main/java/org/apache/accumulo/test/functional/BulkFileIT.java
index 4a33aaea18..1e27ac02ce 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/BulkFileIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/BulkFileIT.java
@@ -93,7 +93,7 @@ public void testBulkFile() throws Exception {
     writeData(writer3, 1000, 1999);
     writer3.close();
 
-    FunctionalTestUtils.bulkImport(c, fs, tableName, dir);
+    c.tableOperations().addFilesTo(tableName).from(dir).load();
 
     FunctionalTestUtils.checkRFiles(c, tableName, 6, 6, 1, 1);
 
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BulkSplitOptimizationIT.java b/test/src/main/java/org/apache/accumulo/test/functional/BulkSplitOptimizationIT.java
index f2852faae8..b17d7a8b54 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/BulkSplitOptimizationIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/BulkSplitOptimizationIT.java
@@ -22,7 +22,6 @@
 
 import org.apache.accumulo.core.cli.ScannerOpts;
 import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.harness.AccumuloClusterHarness;
 import org.apache.accumulo.minicluster.ServerType;
@@ -93,7 +92,7 @@ public void testBulkSplitOptimization() throws Exception {
     FileStatus[] stats = fs.listStatus(testDir);
 
     System.out.println("Number of generated files: " + stats.length);
-    FunctionalTestUtils.bulkImport(c, fs, tableName, testDir.toString());
+    c.tableOperations().addFilesTo(tableName).from(testDir.toString()).load();
     FunctionalTestUtils.checkSplits(c, tableName, 0, 0);
     FunctionalTestUtils.checkRFiles(c, tableName, 1, 1, 100, 100);
 
@@ -118,7 +117,6 @@ public void testBulkSplitOptimization() throws Exception {
     opts.cols = 1;
     opts.setTableName(tableName);
 
-    AuthenticationToken adminToken = getAdminToken();
     opts.setClientInfo(getClientInfo());
     VerifyIngest.verifyIngest(c, opts, new ScannerOpts());
 
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java b/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
index fa9dc2f412..498997670a 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
@@ -114,7 +114,7 @@ public void test() throws Exception {
     Path testrf = new Path(root, "testrf");
     FunctionalTestUtils.createRFiles(c, fs, testrf.toString(), 500000, 59, 4);
 
-    FunctionalTestUtils.bulkImport(c, fs, tableName, testrf.toString());
+    c.tableOperations().addFilesTo(tableName).from(testrf.toString()).load();
     int beforeCount = countFiles(c);
 
     final AtomicBoolean fail = new AtomicBoolean(false);
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java b/test/src/main/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java
index eca5e0bd5f..ddeee7d49e 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java
@@ -114,22 +114,6 @@ static void checkRFiles(Connector c, String tableName, int minTablets, int maxTa
     }
   }
 
-  static public void bulkImport(Connector c, FileSystem fs, String table, String dir)
-      throws Exception {
-    String failDir = dir + "_failures";
-    Path failPath = new Path(failDir);
-    fs.delete(failPath, true);
-    fs.mkdirs(failPath);
-
-    // Ensure server can read/modify files
-    c.tableOperations().importDirectory(table, dir, failDir, false);
-
-    if (fs.listStatus(failPath).length > 0) {
-      throw new Exception("Some files failed to bulk import");
-    }
-
-  }
-
   static public void checkSplits(Connector c, String table, int min, int max) throws Exception {
     Collection<Text> splits = c.tableOperations().listSplits(table);
     if (splits.size() < min || splits.size() > max) {
diff --git a/test/src/main/java/org/apache/accumulo/test/mapred/TokenFileIT.java b/test/src/main/java/org/apache/accumulo/test/mapred/TokenFileIT.java
index 873312a498..af75b902a4 100644
--- a/test/src/main/java/org/apache/accumulo/test/mapred/TokenFileIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/mapred/TokenFileIT.java
@@ -91,7 +91,6 @@ public void close() throws IOException {
 
     }
 
-    @SuppressWarnings("deprecation")
     @Override
     public int run(String[] args) throws Exception {
 
@@ -100,8 +99,7 @@ public int run(String[] args) throws Exception {
             + " <token file> <inputtable> <outputtable>");
       }
 
-      String user = getAdminPrincipal();
-      String tokenFile = args[0];
+      // String tokenFile = args[0];
       String table1 = args[1];
       String table2 = args[2];
 
@@ -120,10 +118,9 @@ public int run(String[] args) throws Exception {
       job.setOutputKeyClass(Text.class);
       job.setOutputValueClass(Mutation.class);
 
-      AccumuloOutputFormat.setConnectorInfo(job, user, tokenFile);
+      AccumuloOutputFormat.setClientInfo(job, getCluster().getClientInfo());
       AccumuloOutputFormat.setCreateTables(job, false);
       AccumuloOutputFormat.setDefaultTableName(job, table2);
-      AccumuloOutputFormat.setZooKeeperInstance(job, getCluster().getClientConfig());
 
       job.setNumReduceTasks(0);
 
diff --git a/test/src/main/java/org/apache/accumulo/test/mapreduce/TokenFileIT.java b/test/src/main/java/org/apache/accumulo/test/mapreduce/TokenFileIT.java
index 5acc3ace1b..a69810b80a 100644
--- a/test/src/main/java/org/apache/accumulo/test/mapreduce/TokenFileIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/mapreduce/TokenFileIT.java
@@ -89,8 +89,7 @@ public int run(String[] args) throws Exception {
             + " <token file> <inputtable> <outputtable>");
       }
 
-      String user = getAdminPrincipal();
-      String tokenFile = args[0];
+      // String tokenFile = args[0];
       String table1 = args[1];
       String table2 = args[2];
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services