You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2014/12/05 04:51:34 UTC

[4/6] accumulo git commit: ACCUMULO-1798 Add ability to specify compaction strategy for user specificed compactions.

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2f788f48/proxy/src/main/cpp/AccumuloProxy.h
----------------------------------------------------------------------
diff --git a/proxy/src/main/cpp/AccumuloProxy.h b/proxy/src/main/cpp/AccumuloProxy.h
index e9b7769..269884f 100644
--- a/proxy/src/main/cpp/AccumuloProxy.h
+++ b/proxy/src/main/cpp/AccumuloProxy.h
@@ -38,7 +38,7 @@ class AccumuloProxyIf {
   virtual void checkIteratorConflicts(const std::string& login, const std::string& tableName, const IteratorSetting& setting, const std::set<IteratorScope::type> & scopes) = 0;
   virtual void clearLocatorCache(const std::string& login, const std::string& tableName) = 0;
   virtual void cloneTable(const std::string& login, const std::string& tableName, const std::string& newTableName, const bool flush, const std::map<std::string, std::string> & propertiesToSet, const std::set<std::string> & propertiesToExclude) = 0;
-  virtual void compactTable(const std::string& login, const std::string& tableName, const std::string& startRow, const std::string& endRow, const std::vector<IteratorSetting> & iterators, const bool flush, const bool wait) = 0;
+  virtual void compactTable(const std::string& login, const std::string& tableName, const std::string& startRow, const std::string& endRow, const std::vector<IteratorSetting> & iterators, const bool flush, const bool wait, const CompactionStrategyConfig& compactionStrategy) = 0;
   virtual void cancelCompaction(const std::string& login, const std::string& tableName) = 0;
   virtual void createTable(const std::string& login, const std::string& tableName, const bool versioningIter, const TimeType::type type) = 0;
   virtual void deleteTable(const std::string& login, const std::string& tableName) = 0;
@@ -159,7 +159,7 @@ class AccumuloProxyNull : virtual public AccumuloProxyIf {
   void cloneTable(const std::string& /* login */, const std::string& /* tableName */, const std::string& /* newTableName */, const bool /* flush */, const std::map<std::string, std::string> & /* propertiesToSet */, const std::set<std::string> & /* propertiesToExclude */) {
     return;
   }
-  void compactTable(const std::string& /* login */, const std::string& /* tableName */, const std::string& /* startRow */, const std::string& /* endRow */, const std::vector<IteratorSetting> & /* iterators */, const bool /* flush */, const bool /* wait */) {
+  void compactTable(const std::string& /* login */, const std::string& /* tableName */, const std::string& /* startRow */, const std::string& /* endRow */, const std::vector<IteratorSetting> & /* iterators */, const bool /* flush */, const bool /* wait */, const CompactionStrategyConfig& /* compactionStrategy */) {
     return;
   }
   void cancelCompaction(const std::string& /* login */, const std::string& /* tableName */) {
@@ -1419,7 +1419,7 @@ class AccumuloProxy_cloneTable_presult {
 };
 
 typedef struct _AccumuloProxy_compactTable_args__isset {
-  _AccumuloProxy_compactTable_args__isset() : login(false), tableName(false), startRow(false), endRow(false), iterators(false), flush(false), wait(false) {}
+  _AccumuloProxy_compactTable_args__isset() : login(false), tableName(false), startRow(false), endRow(false), iterators(false), flush(false), wait(false), compactionStrategy(false) {}
   bool login;
   bool tableName;
   bool startRow;
@@ -1427,6 +1427,7 @@ typedef struct _AccumuloProxy_compactTable_args__isset {
   bool iterators;
   bool flush;
   bool wait;
+  bool compactionStrategy;
 } _AccumuloProxy_compactTable_args__isset;
 
 class AccumuloProxy_compactTable_args {
@@ -1444,6 +1445,7 @@ class AccumuloProxy_compactTable_args {
   std::vector<IteratorSetting>  iterators;
   bool flush;
   bool wait;
+  CompactionStrategyConfig compactionStrategy;
 
   _AccumuloProxy_compactTable_args__isset __isset;
 
@@ -1475,6 +1477,10 @@ class AccumuloProxy_compactTable_args {
     wait = val;
   }
 
+  void __set_compactionStrategy(const CompactionStrategyConfig& val) {
+    compactionStrategy = val;
+  }
+
   bool operator == (const AccumuloProxy_compactTable_args & rhs) const
   {
     if (!(login == rhs.login))
@@ -1491,6 +1497,8 @@ class AccumuloProxy_compactTable_args {
       return false;
     if (!(wait == rhs.wait))
       return false;
+    if (!(compactionStrategy == rhs.compactionStrategy))
+      return false;
     return true;
   }
   bool operator != (const AccumuloProxy_compactTable_args &rhs) const {
@@ -1518,6 +1526,7 @@ class AccumuloProxy_compactTable_pargs {
   const std::vector<IteratorSetting> * iterators;
   const bool* flush;
   const bool* wait;
+  const CompactionStrategyConfig* compactionStrategy;
 
   uint32_t write(::apache::thrift::protocol::TProtocol* oprot) const;
 
@@ -11342,8 +11351,8 @@ class AccumuloProxyClient : virtual public AccumuloProxyIf {
   void cloneTable(const std::string& login, const std::string& tableName, const std::string& newTableName, const bool flush, const std::map<std::string, std::string> & propertiesToSet, const std::set<std::string> & propertiesToExclude);
   void send_cloneTable(const std::string& login, const std::string& tableName, const std::string& newTableName, const bool flush, const std::map<std::string, std::string> & propertiesToSet, const std::set<std::string> & propertiesToExclude);
   void recv_cloneTable();
-  void compactTable(const std::string& login, const std::string& tableName, const std::string& startRow, const std::string& endRow, const std::vector<IteratorSetting> & iterators, const bool flush, const bool wait);
-  void send_compactTable(const std::string& login, const std::string& tableName, const std::string& startRow, const std::string& endRow, const std::vector<IteratorSetting> & iterators, const bool flush, const bool wait);
+  void compactTable(const std::string& login, const std::string& tableName, const std::string& startRow, const std::string& endRow, const std::vector<IteratorSetting> & iterators, const bool flush, const bool wait, const CompactionStrategyConfig& compactionStrategy);
+  void send_compactTable(const std::string& login, const std::string& tableName, const std::string& startRow, const std::string& endRow, const std::vector<IteratorSetting> & iterators, const bool flush, const bool wait, const CompactionStrategyConfig& compactionStrategy);
   void recv_compactTable();
   void cancelCompaction(const std::string& login, const std::string& tableName);
   void send_cancelCompaction(const std::string& login, const std::string& tableName);
@@ -11815,13 +11824,13 @@ class AccumuloProxyMultiface : virtual public AccumuloProxyIf {
     ifaces_[i]->cloneTable(login, tableName, newTableName, flush, propertiesToSet, propertiesToExclude);
   }
 
-  void compactTable(const std::string& login, const std::string& tableName, const std::string& startRow, const std::string& endRow, const std::vector<IteratorSetting> & iterators, const bool flush, const bool wait) {
+  void compactTable(const std::string& login, const std::string& tableName, const std::string& startRow, const std::string& endRow, const std::vector<IteratorSetting> & iterators, const bool flush, const bool wait, const CompactionStrategyConfig& compactionStrategy) {
     size_t sz = ifaces_.size();
     size_t i = 0;
     for (; i < (sz - 1); ++i) {
-      ifaces_[i]->compactTable(login, tableName, startRow, endRow, iterators, flush, wait);
+      ifaces_[i]->compactTable(login, tableName, startRow, endRow, iterators, flush, wait, compactionStrategy);
     }
-    ifaces_[i]->compactTable(login, tableName, startRow, endRow, iterators, flush, wait);
+    ifaces_[i]->compactTable(login, tableName, startRow, endRow, iterators, flush, wait, compactionStrategy);
   }
 
   void cancelCompaction(const std::string& login, const std::string& tableName) {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2f788f48/proxy/src/main/cpp/AccumuloProxy_server.skeleton.cpp
----------------------------------------------------------------------
diff --git a/proxy/src/main/cpp/AccumuloProxy_server.skeleton.cpp b/proxy/src/main/cpp/AccumuloProxy_server.skeleton.cpp
index 2654c37..302aec2 100644
--- a/proxy/src/main/cpp/AccumuloProxy_server.skeleton.cpp
+++ b/proxy/src/main/cpp/AccumuloProxy_server.skeleton.cpp
@@ -73,7 +73,7 @@ class AccumuloProxyHandler : virtual public AccumuloProxyIf {
     printf("cloneTable\n");
   }
 
-  void compactTable(const std::string& login, const std::string& tableName, const std::string& startRow, const std::string& endRow, const std::vector<IteratorSetting> & iterators, const bool flush, const bool wait) {
+  void compactTable(const std::string& login, const std::string& tableName, const std::string& startRow, const std::string& endRow, const std::vector<IteratorSetting> & iterators, const bool flush, const bool wait, const CompactionStrategyConfig& compactionStrategy) {
     // Your implementation goes here
     printf("compactTable\n");
   }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2f788f48/proxy/src/main/cpp/proxy_types.cpp
----------------------------------------------------------------------
diff --git a/proxy/src/main/cpp/proxy_types.cpp b/proxy/src/main/cpp/proxy_types.cpp
index edb0978..a055b48 100644
--- a/proxy/src/main/cpp/proxy_types.cpp
+++ b/proxy/src/main/cpp/proxy_types.cpp
@@ -2611,6 +2611,105 @@ void swap(WriterOptions &a, WriterOptions &b) {
   swap(a.__isset, b.__isset);
 }
 
+const char* CompactionStrategyConfig::ascii_fingerprint = "F7C641917C22B35AE581CCD54910B00D";
+const uint8_t CompactionStrategyConfig::binary_fingerprint[16] = {0xF7,0xC6,0x41,0x91,0x7C,0x22,0xB3,0x5A,0xE5,0x81,0xCC,0xD5,0x49,0x10,0xB0,0x0D};
+
+uint32_t CompactionStrategyConfig::read(::apache::thrift::protocol::TProtocol* iprot) {
+
+  uint32_t xfer = 0;
+  std::string fname;
+  ::apache::thrift::protocol::TType ftype;
+  int16_t fid;
+
+  xfer += iprot->readStructBegin(fname);
+
+  using ::apache::thrift::protocol::TProtocolException;
+
+
+  while (true)
+  {
+    xfer += iprot->readFieldBegin(fname, ftype, fid);
+    if (ftype == ::apache::thrift::protocol::T_STOP) {
+      break;
+    }
+    switch (fid)
+    {
+      case 1:
+        if (ftype == ::apache::thrift::protocol::T_STRING) {
+          xfer += iprot->readString(this->className);
+          this->__isset.className = true;
+        } else {
+          xfer += iprot->skip(ftype);
+        }
+        break;
+      case 2:
+        if (ftype == ::apache::thrift::protocol::T_MAP) {
+          {
+            this->options.clear();
+            uint32_t _size125;
+            ::apache::thrift::protocol::TType _ktype126;
+            ::apache::thrift::protocol::TType _vtype127;
+            xfer += iprot->readMapBegin(_ktype126, _vtype127, _size125);
+            uint32_t _i129;
+            for (_i129 = 0; _i129 < _size125; ++_i129)
+            {
+              std::string _key130;
+              xfer += iprot->readString(_key130);
+              std::string& _val131 = this->options[_key130];
+              xfer += iprot->readString(_val131);
+            }
+            xfer += iprot->readMapEnd();
+          }
+          this->__isset.options = true;
+        } else {
+          xfer += iprot->skip(ftype);
+        }
+        break;
+      default:
+        xfer += iprot->skip(ftype);
+        break;
+    }
+    xfer += iprot->readFieldEnd();
+  }
+
+  xfer += iprot->readStructEnd();
+
+  return xfer;
+}
+
+uint32_t CompactionStrategyConfig::write(::apache::thrift::protocol::TProtocol* oprot) const {
+  uint32_t xfer = 0;
+  xfer += oprot->writeStructBegin("CompactionStrategyConfig");
+
+  xfer += oprot->writeFieldBegin("className", ::apache::thrift::protocol::T_STRING, 1);
+  xfer += oprot->writeString(this->className);
+  xfer += oprot->writeFieldEnd();
+
+  xfer += oprot->writeFieldBegin("options", ::apache::thrift::protocol::T_MAP, 2);
+  {
+    xfer += oprot->writeMapBegin(::apache::thrift::protocol::T_STRING, ::apache::thrift::protocol::T_STRING, static_cast<uint32_t>(this->options.size()));
+    std::map<std::string, std::string> ::const_iterator _iter132;
+    for (_iter132 = this->options.begin(); _iter132 != this->options.end(); ++_iter132)
+    {
+      xfer += oprot->writeString(_iter132->first);
+      xfer += oprot->writeString(_iter132->second);
+    }
+    xfer += oprot->writeMapEnd();
+  }
+  xfer += oprot->writeFieldEnd();
+
+  xfer += oprot->writeFieldStop();
+  xfer += oprot->writeStructEnd();
+  return xfer;
+}
+
+void swap(CompactionStrategyConfig &a, CompactionStrategyConfig &b) {
+  using ::std::swap;
+  swap(a.className, b.className);
+  swap(a.options, b.options);
+  swap(a.__isset, b.__isset);
+}
+
 const char* UnknownScanner::ascii_fingerprint = "EFB929595D312AC8F305D5A794CFEDA1";
 const uint8_t UnknownScanner::binary_fingerprint[16] = {0xEF,0xB9,0x29,0x59,0x5D,0x31,0x2A,0xC8,0xF3,0x05,0xD5,0xA7,0x94,0xCF,0xED,0xA1};
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2f788f48/proxy/src/main/cpp/proxy_types.h
----------------------------------------------------------------------
diff --git a/proxy/src/main/cpp/proxy_types.h b/proxy/src/main/cpp/proxy_types.h
index 625586c..569de88 100644
--- a/proxy/src/main/cpp/proxy_types.h
+++ b/proxy/src/main/cpp/proxy_types.h
@@ -1555,6 +1555,57 @@ class WriterOptions {
 
 void swap(WriterOptions &a, WriterOptions &b);
 
+typedef struct _CompactionStrategyConfig__isset {
+  _CompactionStrategyConfig__isset() : className(false), options(false) {}
+  bool className;
+  bool options;
+} _CompactionStrategyConfig__isset;
+
+class CompactionStrategyConfig {
+ public:
+
+  static const char* ascii_fingerprint; // = "F7C641917C22B35AE581CCD54910B00D";
+  static const uint8_t binary_fingerprint[16]; // = {0xF7,0xC6,0x41,0x91,0x7C,0x22,0xB3,0x5A,0xE5,0x81,0xCC,0xD5,0x49,0x10,0xB0,0x0D};
+
+  CompactionStrategyConfig() : className() {
+  }
+
+  virtual ~CompactionStrategyConfig() throw() {}
+
+  std::string className;
+  std::map<std::string, std::string>  options;
+
+  _CompactionStrategyConfig__isset __isset;
+
+  void __set_className(const std::string& val) {
+    className = val;
+  }
+
+  void __set_options(const std::map<std::string, std::string> & val) {
+    options = val;
+  }
+
+  bool operator == (const CompactionStrategyConfig & rhs) const
+  {
+    if (!(className == rhs.className))
+      return false;
+    if (!(options == rhs.options))
+      return false;
+    return true;
+  }
+  bool operator != (const CompactionStrategyConfig &rhs) const {
+    return !(*this == rhs);
+  }
+
+  bool operator < (const CompactionStrategyConfig & ) const;
+
+  uint32_t read(::apache::thrift::protocol::TProtocol* iprot);
+  uint32_t write(::apache::thrift::protocol::TProtocol* oprot) const;
+
+};
+
+void swap(CompactionStrategyConfig &a, CompactionStrategyConfig &b);
+
 typedef struct _UnknownScanner__isset {
   _UnknownScanner__isset() : msg(false) {}
   bool msg;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2f788f48/proxy/src/main/java/org/apache/accumulo/proxy/ProxyServer.java
----------------------------------------------------------------------
diff --git a/proxy/src/main/java/org/apache/accumulo/proxy/ProxyServer.java b/proxy/src/main/java/org/apache/accumulo/proxy/ProxyServer.java
index bd0782d..b51d43d 100644
--- a/proxy/src/main/java/org/apache/accumulo/proxy/ProxyServer.java
+++ b/proxy/src/main/java/org/apache/accumulo/proxy/ProxyServer.java
@@ -56,6 +56,7 @@ import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.client.ZooKeeperInstance;
 import org.apache.accumulo.core.client.admin.ActiveCompaction;
 import org.apache.accumulo.core.client.admin.ActiveScan;
+import org.apache.accumulo.core.client.admin.CompactionConfig;
 import org.apache.accumulo.core.client.admin.TimeType;
 import org.apache.accumulo.core.client.impl.thrift.TableOperationExceptionType;
 import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException;
@@ -83,6 +84,7 @@ import org.apache.accumulo.proxy.thrift.AccumuloProxy;
 import org.apache.accumulo.proxy.thrift.BatchScanOptions;
 import org.apache.accumulo.proxy.thrift.ColumnUpdate;
 import org.apache.accumulo.proxy.thrift.CompactionReason;
+import org.apache.accumulo.proxy.thrift.CompactionStrategyConfig;
 import org.apache.accumulo.proxy.thrift.CompactionType;
 import org.apache.accumulo.proxy.thrift.Condition;
 import org.apache.accumulo.proxy.thrift.ConditionalStatus;
@@ -331,12 +333,22 @@ public class ProxyServer implements AccumuloProxy.Iface {
   
   @Override
   public void compactTable(ByteBuffer login, String tableName, ByteBuffer startRow, ByteBuffer endRow,
-      List<org.apache.accumulo.proxy.thrift.IteratorSetting> iterators, boolean flush, boolean wait)
+      List<org.apache.accumulo.proxy.thrift.IteratorSetting> iterators, boolean flush, boolean wait, CompactionStrategyConfig compactionStrategy)
       throws org.apache.accumulo.proxy.thrift.AccumuloSecurityException, org.apache.accumulo.proxy.thrift.TableNotFoundException,
       org.apache.accumulo.proxy.thrift.AccumuloException, TException {
     try {
-      getConnector(login).tableOperations().compact(tableName, ByteBufferUtil.toText(startRow), ByteBufferUtil.toText(endRow), getIteratorSettings(iterators),
-          flush, wait);
+      CompactionConfig compactionConfig = new CompactionConfig().setStartRow(ByteBufferUtil.toText(startRow)).setEndRow(ByteBufferUtil.toText(endRow))
+          .setIterators(getIteratorSettings(iterators)).setFlush(flush).setWait(wait);
+      
+      if (compactionStrategy != null) {
+        org.apache.accumulo.core.client.admin.CompactionStrategyConfig ccc = new org.apache.accumulo.core.client.admin.CompactionStrategyConfig(
+            compactionStrategy.getClassName());
+        if (compactionStrategy.options != null)
+          ccc.setOptions(compactionStrategy.options);
+        compactionConfig.setCompactionStrategy(ccc);
+      }
+
+      getConnector(login).tableOperations().compact(tableName, compactionConfig);
     } catch (Exception e) {
       handleExceptionTNF(e);
     }