You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2014/12/05 04:51:34 UTC
[4/6] accumulo git commit: ACCUMULO-1798 Add ability to specify
compaction strategy for user specificed compactions.
http://git-wip-us.apache.org/repos/asf/accumulo/blob/2f788f48/proxy/src/main/cpp/AccumuloProxy.h
----------------------------------------------------------------------
diff --git a/proxy/src/main/cpp/AccumuloProxy.h b/proxy/src/main/cpp/AccumuloProxy.h
index e9b7769..269884f 100644
--- a/proxy/src/main/cpp/AccumuloProxy.h
+++ b/proxy/src/main/cpp/AccumuloProxy.h
@@ -38,7 +38,7 @@ class AccumuloProxyIf {
virtual void checkIteratorConflicts(const std::string& login, const std::string& tableName, const IteratorSetting& setting, const std::set<IteratorScope::type> & scopes) = 0;
virtual void clearLocatorCache(const std::string& login, const std::string& tableName) = 0;
virtual void cloneTable(const std::string& login, const std::string& tableName, const std::string& newTableName, const bool flush, const std::map<std::string, std::string> & propertiesToSet, const std::set<std::string> & propertiesToExclude) = 0;
- virtual void compactTable(const std::string& login, const std::string& tableName, const std::string& startRow, const std::string& endRow, const std::vector<IteratorSetting> & iterators, const bool flush, const bool wait) = 0;
+ virtual void compactTable(const std::string& login, const std::string& tableName, const std::string& startRow, const std::string& endRow, const std::vector<IteratorSetting> & iterators, const bool flush, const bool wait, const CompactionStrategyConfig& compactionStrategy) = 0;
virtual void cancelCompaction(const std::string& login, const std::string& tableName) = 0;
virtual void createTable(const std::string& login, const std::string& tableName, const bool versioningIter, const TimeType::type type) = 0;
virtual void deleteTable(const std::string& login, const std::string& tableName) = 0;
@@ -159,7 +159,7 @@ class AccumuloProxyNull : virtual public AccumuloProxyIf {
void cloneTable(const std::string& /* login */, const std::string& /* tableName */, const std::string& /* newTableName */, const bool /* flush */, const std::map<std::string, std::string> & /* propertiesToSet */, const std::set<std::string> & /* propertiesToExclude */) {
return;
}
- void compactTable(const std::string& /* login */, const std::string& /* tableName */, const std::string& /* startRow */, const std::string& /* endRow */, const std::vector<IteratorSetting> & /* iterators */, const bool /* flush */, const bool /* wait */) {
+ void compactTable(const std::string& /* login */, const std::string& /* tableName */, const std::string& /* startRow */, const std::string& /* endRow */, const std::vector<IteratorSetting> & /* iterators */, const bool /* flush */, const bool /* wait */, const CompactionStrategyConfig& /* compactionStrategy */) {
return;
}
void cancelCompaction(const std::string& /* login */, const std::string& /* tableName */) {
@@ -1419,7 +1419,7 @@ class AccumuloProxy_cloneTable_presult {
};
typedef struct _AccumuloProxy_compactTable_args__isset {
- _AccumuloProxy_compactTable_args__isset() : login(false), tableName(false), startRow(false), endRow(false), iterators(false), flush(false), wait(false) {}
+ _AccumuloProxy_compactTable_args__isset() : login(false), tableName(false), startRow(false), endRow(false), iterators(false), flush(false), wait(false), compactionStrategy(false) {}
bool login;
bool tableName;
bool startRow;
@@ -1427,6 +1427,7 @@ typedef struct _AccumuloProxy_compactTable_args__isset {
bool iterators;
bool flush;
bool wait;
+ bool compactionStrategy;
} _AccumuloProxy_compactTable_args__isset;
class AccumuloProxy_compactTable_args {
@@ -1444,6 +1445,7 @@ class AccumuloProxy_compactTable_args {
std::vector<IteratorSetting> iterators;
bool flush;
bool wait;
+ CompactionStrategyConfig compactionStrategy;
_AccumuloProxy_compactTable_args__isset __isset;
@@ -1475,6 +1477,10 @@ class AccumuloProxy_compactTable_args {
wait = val;
}
+ void __set_compactionStrategy(const CompactionStrategyConfig& val) {
+ compactionStrategy = val;
+ }
+
bool operator == (const AccumuloProxy_compactTable_args & rhs) const
{
if (!(login == rhs.login))
@@ -1491,6 +1497,8 @@ class AccumuloProxy_compactTable_args {
return false;
if (!(wait == rhs.wait))
return false;
+ if (!(compactionStrategy == rhs.compactionStrategy))
+ return false;
return true;
}
bool operator != (const AccumuloProxy_compactTable_args &rhs) const {
@@ -1518,6 +1526,7 @@ class AccumuloProxy_compactTable_pargs {
const std::vector<IteratorSetting> * iterators;
const bool* flush;
const bool* wait;
+ const CompactionStrategyConfig* compactionStrategy;
uint32_t write(::apache::thrift::protocol::TProtocol* oprot) const;
@@ -11342,8 +11351,8 @@ class AccumuloProxyClient : virtual public AccumuloProxyIf {
void cloneTable(const std::string& login, const std::string& tableName, const std::string& newTableName, const bool flush, const std::map<std::string, std::string> & propertiesToSet, const std::set<std::string> & propertiesToExclude);
void send_cloneTable(const std::string& login, const std::string& tableName, const std::string& newTableName, const bool flush, const std::map<std::string, std::string> & propertiesToSet, const std::set<std::string> & propertiesToExclude);
void recv_cloneTable();
- void compactTable(const std::string& login, const std::string& tableName, const std::string& startRow, const std::string& endRow, const std::vector<IteratorSetting> & iterators, const bool flush, const bool wait);
- void send_compactTable(const std::string& login, const std::string& tableName, const std::string& startRow, const std::string& endRow, const std::vector<IteratorSetting> & iterators, const bool flush, const bool wait);
+ void compactTable(const std::string& login, const std::string& tableName, const std::string& startRow, const std::string& endRow, const std::vector<IteratorSetting> & iterators, const bool flush, const bool wait, const CompactionStrategyConfig& compactionStrategy);
+ void send_compactTable(const std::string& login, const std::string& tableName, const std::string& startRow, const std::string& endRow, const std::vector<IteratorSetting> & iterators, const bool flush, const bool wait, const CompactionStrategyConfig& compactionStrategy);
void recv_compactTable();
void cancelCompaction(const std::string& login, const std::string& tableName);
void send_cancelCompaction(const std::string& login, const std::string& tableName);
@@ -11815,13 +11824,13 @@ class AccumuloProxyMultiface : virtual public AccumuloProxyIf {
ifaces_[i]->cloneTable(login, tableName, newTableName, flush, propertiesToSet, propertiesToExclude);
}
- void compactTable(const std::string& login, const std::string& tableName, const std::string& startRow, const std::string& endRow, const std::vector<IteratorSetting> & iterators, const bool flush, const bool wait) {
+ void compactTable(const std::string& login, const std::string& tableName, const std::string& startRow, const std::string& endRow, const std::vector<IteratorSetting> & iterators, const bool flush, const bool wait, const CompactionStrategyConfig& compactionStrategy) {
size_t sz = ifaces_.size();
size_t i = 0;
for (; i < (sz - 1); ++i) {
- ifaces_[i]->compactTable(login, tableName, startRow, endRow, iterators, flush, wait);
+ ifaces_[i]->compactTable(login, tableName, startRow, endRow, iterators, flush, wait, compactionStrategy);
}
- ifaces_[i]->compactTable(login, tableName, startRow, endRow, iterators, flush, wait);
+ ifaces_[i]->compactTable(login, tableName, startRow, endRow, iterators, flush, wait, compactionStrategy);
}
void cancelCompaction(const std::string& login, const std::string& tableName) {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/2f788f48/proxy/src/main/cpp/AccumuloProxy_server.skeleton.cpp
----------------------------------------------------------------------
diff --git a/proxy/src/main/cpp/AccumuloProxy_server.skeleton.cpp b/proxy/src/main/cpp/AccumuloProxy_server.skeleton.cpp
index 2654c37..302aec2 100644
--- a/proxy/src/main/cpp/AccumuloProxy_server.skeleton.cpp
+++ b/proxy/src/main/cpp/AccumuloProxy_server.skeleton.cpp
@@ -73,7 +73,7 @@ class AccumuloProxyHandler : virtual public AccumuloProxyIf {
printf("cloneTable\n");
}
- void compactTable(const std::string& login, const std::string& tableName, const std::string& startRow, const std::string& endRow, const std::vector<IteratorSetting> & iterators, const bool flush, const bool wait) {
+ void compactTable(const std::string& login, const std::string& tableName, const std::string& startRow, const std::string& endRow, const std::vector<IteratorSetting> & iterators, const bool flush, const bool wait, const CompactionStrategyConfig& compactionStrategy) {
// Your implementation goes here
printf("compactTable\n");
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/2f788f48/proxy/src/main/cpp/proxy_types.cpp
----------------------------------------------------------------------
diff --git a/proxy/src/main/cpp/proxy_types.cpp b/proxy/src/main/cpp/proxy_types.cpp
index edb0978..a055b48 100644
--- a/proxy/src/main/cpp/proxy_types.cpp
+++ b/proxy/src/main/cpp/proxy_types.cpp
@@ -2611,6 +2611,105 @@ void swap(WriterOptions &a, WriterOptions &b) {
swap(a.__isset, b.__isset);
}
+const char* CompactionStrategyConfig::ascii_fingerprint = "F7C641917C22B35AE581CCD54910B00D";
+const uint8_t CompactionStrategyConfig::binary_fingerprint[16] = {0xF7,0xC6,0x41,0x91,0x7C,0x22,0xB3,0x5A,0xE5,0x81,0xCC,0xD5,0x49,0x10,0xB0,0x0D};
+
+uint32_t CompactionStrategyConfig::read(::apache::thrift::protocol::TProtocol* iprot) {
+
+ uint32_t xfer = 0;
+ std::string fname;
+ ::apache::thrift::protocol::TType ftype;
+ int16_t fid;
+
+ xfer += iprot->readStructBegin(fname);
+
+ using ::apache::thrift::protocol::TProtocolException;
+
+
+ while (true)
+ {
+ xfer += iprot->readFieldBegin(fname, ftype, fid);
+ if (ftype == ::apache::thrift::protocol::T_STOP) {
+ break;
+ }
+ switch (fid)
+ {
+ case 1:
+ if (ftype == ::apache::thrift::protocol::T_STRING) {
+ xfer += iprot->readString(this->className);
+ this->__isset.className = true;
+ } else {
+ xfer += iprot->skip(ftype);
+ }
+ break;
+ case 2:
+ if (ftype == ::apache::thrift::protocol::T_MAP) {
+ {
+ this->options.clear();
+ uint32_t _size125;
+ ::apache::thrift::protocol::TType _ktype126;
+ ::apache::thrift::protocol::TType _vtype127;
+ xfer += iprot->readMapBegin(_ktype126, _vtype127, _size125);
+ uint32_t _i129;
+ for (_i129 = 0; _i129 < _size125; ++_i129)
+ {
+ std::string _key130;
+ xfer += iprot->readString(_key130);
+ std::string& _val131 = this->options[_key130];
+ xfer += iprot->readString(_val131);
+ }
+ xfer += iprot->readMapEnd();
+ }
+ this->__isset.options = true;
+ } else {
+ xfer += iprot->skip(ftype);
+ }
+ break;
+ default:
+ xfer += iprot->skip(ftype);
+ break;
+ }
+ xfer += iprot->readFieldEnd();
+ }
+
+ xfer += iprot->readStructEnd();
+
+ return xfer;
+}
+
+uint32_t CompactionStrategyConfig::write(::apache::thrift::protocol::TProtocol* oprot) const {
+ uint32_t xfer = 0;
+ xfer += oprot->writeStructBegin("CompactionStrategyConfig");
+
+ xfer += oprot->writeFieldBegin("className", ::apache::thrift::protocol::T_STRING, 1);
+ xfer += oprot->writeString(this->className);
+ xfer += oprot->writeFieldEnd();
+
+ xfer += oprot->writeFieldBegin("options", ::apache::thrift::protocol::T_MAP, 2);
+ {
+ xfer += oprot->writeMapBegin(::apache::thrift::protocol::T_STRING, ::apache::thrift::protocol::T_STRING, static_cast<uint32_t>(this->options.size()));
+ std::map<std::string, std::string> ::const_iterator _iter132;
+ for (_iter132 = this->options.begin(); _iter132 != this->options.end(); ++_iter132)
+ {
+ xfer += oprot->writeString(_iter132->first);
+ xfer += oprot->writeString(_iter132->second);
+ }
+ xfer += oprot->writeMapEnd();
+ }
+ xfer += oprot->writeFieldEnd();
+
+ xfer += oprot->writeFieldStop();
+ xfer += oprot->writeStructEnd();
+ return xfer;
+}
+
+void swap(CompactionStrategyConfig &a, CompactionStrategyConfig &b) {
+ using ::std::swap;
+ swap(a.className, b.className);
+ swap(a.options, b.options);
+ swap(a.__isset, b.__isset);
+}
+
const char* UnknownScanner::ascii_fingerprint = "EFB929595D312AC8F305D5A794CFEDA1";
const uint8_t UnknownScanner::binary_fingerprint[16] = {0xEF,0xB9,0x29,0x59,0x5D,0x31,0x2A,0xC8,0xF3,0x05,0xD5,0xA7,0x94,0xCF,0xED,0xA1};
http://git-wip-us.apache.org/repos/asf/accumulo/blob/2f788f48/proxy/src/main/cpp/proxy_types.h
----------------------------------------------------------------------
diff --git a/proxy/src/main/cpp/proxy_types.h b/proxy/src/main/cpp/proxy_types.h
index 625586c..569de88 100644
--- a/proxy/src/main/cpp/proxy_types.h
+++ b/proxy/src/main/cpp/proxy_types.h
@@ -1555,6 +1555,57 @@ class WriterOptions {
void swap(WriterOptions &a, WriterOptions &b);
+typedef struct _CompactionStrategyConfig__isset {
+ _CompactionStrategyConfig__isset() : className(false), options(false) {}
+ bool className;
+ bool options;
+} _CompactionStrategyConfig__isset;
+
+class CompactionStrategyConfig {
+ public:
+
+ static const char* ascii_fingerprint; // = "F7C641917C22B35AE581CCD54910B00D";
+ static const uint8_t binary_fingerprint[16]; // = {0xF7,0xC6,0x41,0x91,0x7C,0x22,0xB3,0x5A,0xE5,0x81,0xCC,0xD5,0x49,0x10,0xB0,0x0D};
+
+ CompactionStrategyConfig() : className() {
+ }
+
+ virtual ~CompactionStrategyConfig() throw() {}
+
+ std::string className;
+ std::map<std::string, std::string> options;
+
+ _CompactionStrategyConfig__isset __isset;
+
+ void __set_className(const std::string& val) {
+ className = val;
+ }
+
+ void __set_options(const std::map<std::string, std::string> & val) {
+ options = val;
+ }
+
+ bool operator == (const CompactionStrategyConfig & rhs) const
+ {
+ if (!(className == rhs.className))
+ return false;
+ if (!(options == rhs.options))
+ return false;
+ return true;
+ }
+ bool operator != (const CompactionStrategyConfig &rhs) const {
+ return !(*this == rhs);
+ }
+
+ bool operator < (const CompactionStrategyConfig & ) const;
+
+ uint32_t read(::apache::thrift::protocol::TProtocol* iprot);
+ uint32_t write(::apache::thrift::protocol::TProtocol* oprot) const;
+
+};
+
+void swap(CompactionStrategyConfig &a, CompactionStrategyConfig &b);
+
typedef struct _UnknownScanner__isset {
_UnknownScanner__isset() : msg(false) {}
bool msg;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/2f788f48/proxy/src/main/java/org/apache/accumulo/proxy/ProxyServer.java
----------------------------------------------------------------------
diff --git a/proxy/src/main/java/org/apache/accumulo/proxy/ProxyServer.java b/proxy/src/main/java/org/apache/accumulo/proxy/ProxyServer.java
index bd0782d..b51d43d 100644
--- a/proxy/src/main/java/org/apache/accumulo/proxy/ProxyServer.java
+++ b/proxy/src/main/java/org/apache/accumulo/proxy/ProxyServer.java
@@ -56,6 +56,7 @@ import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.ZooKeeperInstance;
import org.apache.accumulo.core.client.admin.ActiveCompaction;
import org.apache.accumulo.core.client.admin.ActiveScan;
+import org.apache.accumulo.core.client.admin.CompactionConfig;
import org.apache.accumulo.core.client.admin.TimeType;
import org.apache.accumulo.core.client.impl.thrift.TableOperationExceptionType;
import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException;
@@ -83,6 +84,7 @@ import org.apache.accumulo.proxy.thrift.AccumuloProxy;
import org.apache.accumulo.proxy.thrift.BatchScanOptions;
import org.apache.accumulo.proxy.thrift.ColumnUpdate;
import org.apache.accumulo.proxy.thrift.CompactionReason;
+import org.apache.accumulo.proxy.thrift.CompactionStrategyConfig;
import org.apache.accumulo.proxy.thrift.CompactionType;
import org.apache.accumulo.proxy.thrift.Condition;
import org.apache.accumulo.proxy.thrift.ConditionalStatus;
@@ -331,12 +333,22 @@ public class ProxyServer implements AccumuloProxy.Iface {
@Override
public void compactTable(ByteBuffer login, String tableName, ByteBuffer startRow, ByteBuffer endRow,
- List<org.apache.accumulo.proxy.thrift.IteratorSetting> iterators, boolean flush, boolean wait)
+ List<org.apache.accumulo.proxy.thrift.IteratorSetting> iterators, boolean flush, boolean wait, CompactionStrategyConfig compactionStrategy)
throws org.apache.accumulo.proxy.thrift.AccumuloSecurityException, org.apache.accumulo.proxy.thrift.TableNotFoundException,
org.apache.accumulo.proxy.thrift.AccumuloException, TException {
try {
- getConnector(login).tableOperations().compact(tableName, ByteBufferUtil.toText(startRow), ByteBufferUtil.toText(endRow), getIteratorSettings(iterators),
- flush, wait);
+ CompactionConfig compactionConfig = new CompactionConfig().setStartRow(ByteBufferUtil.toText(startRow)).setEndRow(ByteBufferUtil.toText(endRow))
+ .setIterators(getIteratorSettings(iterators)).setFlush(flush).setWait(wait);
+
+ if (compactionStrategy != null) {
+ org.apache.accumulo.core.client.admin.CompactionStrategyConfig ccc = new org.apache.accumulo.core.client.admin.CompactionStrategyConfig(
+ compactionStrategy.getClassName());
+ if (compactionStrategy.options != null)
+ ccc.setOptions(compactionStrategy.options);
+ compactionConfig.setCompactionStrategy(ccc);
+ }
+
+ getConnector(login).tableOperations().compact(tableName, compactionConfig);
} catch (Exception e) {
handleExceptionTNF(e);
}