You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2017/03/20 14:49:02 UTC
[6/9] accumulo git commit: ACCUMULO-4501 ACCUMULO-96 Added
Summarization
http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/main/java/org/apache/accumulo/core/data/thrift/TSummaryRequest.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/data/thrift/TSummaryRequest.java b/core/src/main/java/org/apache/accumulo/core/data/thrift/TSummaryRequest.java
new file mode 100644
index 0000000..78c242e
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/data/thrift/TSummaryRequest.java
@@ -0,0 +1,760 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Autogenerated by Thrift Compiler (0.10.0)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ * @generated
+ */
+package org.apache.accumulo.core.data.thrift;
+
+@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"})
+@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.10.0)")
+public class TSummaryRequest implements org.apache.thrift.TBase<TSummaryRequest, TSummaryRequest._Fields>, java.io.Serializable, Cloneable, Comparable<TSummaryRequest> {
+ private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("TSummaryRequest");
+
+ private static final org.apache.thrift.protocol.TField TABLE_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("tableId", org.apache.thrift.protocol.TType.STRING, (short)1);
+ private static final org.apache.thrift.protocol.TField BOUNDS_FIELD_DESC = new org.apache.thrift.protocol.TField("bounds", org.apache.thrift.protocol.TType.STRUCT, (short)2);
+ private static final org.apache.thrift.protocol.TField SUMMARIZERS_FIELD_DESC = new org.apache.thrift.protocol.TField("summarizers", org.apache.thrift.protocol.TType.LIST, (short)3);
+ private static final org.apache.thrift.protocol.TField SUMMARIZER_PATTERN_FIELD_DESC = new org.apache.thrift.protocol.TField("summarizerPattern", org.apache.thrift.protocol.TType.STRING, (short)4);
+
+ private static final org.apache.thrift.scheme.SchemeFactory STANDARD_SCHEME_FACTORY = new TSummaryRequestStandardSchemeFactory();
+ private static final org.apache.thrift.scheme.SchemeFactory TUPLE_SCHEME_FACTORY = new TSummaryRequestTupleSchemeFactory();
+
+ public java.lang.String tableId; // required
+ public TRowRange bounds; // required
+ public java.util.List<TSummarizerConfiguration> summarizers; // required
+ public java.lang.String summarizerPattern; // required
+
+ /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+ public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+ TABLE_ID((short)1, "tableId"),
+ BOUNDS((short)2, "bounds"),
+ SUMMARIZERS((short)3, "summarizers"),
+ SUMMARIZER_PATTERN((short)4, "summarizerPattern");
+
+ private static final java.util.Map<java.lang.String, _Fields> byName = new java.util.HashMap<java.lang.String, _Fields>();
+
+ static {
+ for (_Fields field : java.util.EnumSet.allOf(_Fields.class)) {
+ byName.put(field.getFieldName(), field);
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, or null if its not found.
+ */
+ public static _Fields findByThriftId(int fieldId) {
+ switch(fieldId) {
+ case 1: // TABLE_ID
+ return TABLE_ID;
+ case 2: // BOUNDS
+ return BOUNDS;
+ case 3: // SUMMARIZERS
+ return SUMMARIZERS;
+ case 4: // SUMMARIZER_PATTERN
+ return SUMMARIZER_PATTERN;
+ default:
+ return null;
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, throwing an exception
+ * if it is not found.
+ */
+ public static _Fields findByThriftIdOrThrow(int fieldId) {
+ _Fields fields = findByThriftId(fieldId);
+ if (fields == null) throw new java.lang.IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+ return fields;
+ }
+
+ /**
+ * Find the _Fields constant that matches name, or null if its not found.
+ */
+ public static _Fields findByName(java.lang.String name) {
+ return byName.get(name);
+ }
+
+ private final short _thriftId;
+ private final java.lang.String _fieldName;
+
+ _Fields(short thriftId, java.lang.String fieldName) {
+ _thriftId = thriftId;
+ _fieldName = fieldName;
+ }
+
+ public short getThriftFieldId() {
+ return _thriftId;
+ }
+
+ public java.lang.String getFieldName() {
+ return _fieldName;
+ }
+ }
+
+ // isset id assignments
+ public static final java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+ static {
+ java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new java.util.EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+ tmpMap.put(_Fields.TABLE_ID, new org.apache.thrift.meta_data.FieldMetaData("tableId", org.apache.thrift.TFieldRequirementType.DEFAULT,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+ tmpMap.put(_Fields.BOUNDS, new org.apache.thrift.meta_data.FieldMetaData("bounds", org.apache.thrift.TFieldRequirementType.DEFAULT,
+ new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, TRowRange.class)));
+ tmpMap.put(_Fields.SUMMARIZERS, new org.apache.thrift.meta_data.FieldMetaData("summarizers", org.apache.thrift.TFieldRequirementType.DEFAULT,
+ new org.apache.thrift.meta_data.ListMetaData(org.apache.thrift.protocol.TType.LIST,
+ new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, TSummarizerConfiguration.class))));
+ tmpMap.put(_Fields.SUMMARIZER_PATTERN, new org.apache.thrift.meta_data.FieldMetaData("summarizerPattern", org.apache.thrift.TFieldRequirementType.DEFAULT,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+ metaDataMap = java.util.Collections.unmodifiableMap(tmpMap);
+ org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(TSummaryRequest.class, metaDataMap);
+ }
+
+ public TSummaryRequest() {
+ }
+
+ public TSummaryRequest(
+ java.lang.String tableId,
+ TRowRange bounds,
+ java.util.List<TSummarizerConfiguration> summarizers,
+ java.lang.String summarizerPattern)
+ {
+ this();
+ this.tableId = tableId;
+ this.bounds = bounds;
+ this.summarizers = summarizers;
+ this.summarizerPattern = summarizerPattern;
+ }
+
+ /**
+ * Performs a deep copy on <i>other</i>.
+ */
+ public TSummaryRequest(TSummaryRequest other) {
+ if (other.isSetTableId()) {
+ this.tableId = other.tableId;
+ }
+ if (other.isSetBounds()) {
+ this.bounds = new TRowRange(other.bounds);
+ }
+ if (other.isSetSummarizers()) {
+ java.util.List<TSummarizerConfiguration> __this__summarizers = new java.util.ArrayList<TSummarizerConfiguration>(other.summarizers.size());
+ for (TSummarizerConfiguration other_element : other.summarizers) {
+ __this__summarizers.add(new TSummarizerConfiguration(other_element));
+ }
+ this.summarizers = __this__summarizers;
+ }
+ if (other.isSetSummarizerPattern()) {
+ this.summarizerPattern = other.summarizerPattern;
+ }
+ }
+
+ public TSummaryRequest deepCopy() {
+ return new TSummaryRequest(this);
+ }
+
+ @Override
+ public void clear() {
+ this.tableId = null;
+ this.bounds = null;
+ this.summarizers = null;
+ this.summarizerPattern = null;
+ }
+
+ public java.lang.String getTableId() {
+ return this.tableId;
+ }
+
+ public TSummaryRequest setTableId(java.lang.String tableId) {
+ this.tableId = tableId;
+ return this;
+ }
+
+ public void unsetTableId() {
+ this.tableId = null;
+ }
+
+ /** Returns true if field tableId is set (has been assigned a value) and false otherwise */
+ public boolean isSetTableId() {
+ return this.tableId != null;
+ }
+
+ public void setTableIdIsSet(boolean value) {
+ if (!value) {
+ this.tableId = null;
+ }
+ }
+
+ public TRowRange getBounds() {
+ return this.bounds;
+ }
+
+ public TSummaryRequest setBounds(TRowRange bounds) {
+ this.bounds = bounds;
+ return this;
+ }
+
+ public void unsetBounds() {
+ this.bounds = null;
+ }
+
+ /** Returns true if field bounds is set (has been assigned a value) and false otherwise */
+ public boolean isSetBounds() {
+ return this.bounds != null;
+ }
+
+ public void setBoundsIsSet(boolean value) {
+ if (!value) {
+ this.bounds = null;
+ }
+ }
+
+ public int getSummarizersSize() {
+ return (this.summarizers == null) ? 0 : this.summarizers.size();
+ }
+
+ public java.util.Iterator<TSummarizerConfiguration> getSummarizersIterator() {
+ return (this.summarizers == null) ? null : this.summarizers.iterator();
+ }
+
+ public void addToSummarizers(TSummarizerConfiguration elem) {
+ if (this.summarizers == null) {
+ this.summarizers = new java.util.ArrayList<TSummarizerConfiguration>();
+ }
+ this.summarizers.add(elem);
+ }
+
+ public java.util.List<TSummarizerConfiguration> getSummarizers() {
+ return this.summarizers;
+ }
+
+ public TSummaryRequest setSummarizers(java.util.List<TSummarizerConfiguration> summarizers) {
+ this.summarizers = summarizers;
+ return this;
+ }
+
+ public void unsetSummarizers() {
+ this.summarizers = null;
+ }
+
+ /** Returns true if field summarizers is set (has been assigned a value) and false otherwise */
+ public boolean isSetSummarizers() {
+ return this.summarizers != null;
+ }
+
+ public void setSummarizersIsSet(boolean value) {
+ if (!value) {
+ this.summarizers = null;
+ }
+ }
+
+ public java.lang.String getSummarizerPattern() {
+ return this.summarizerPattern;
+ }
+
+ public TSummaryRequest setSummarizerPattern(java.lang.String summarizerPattern) {
+ this.summarizerPattern = summarizerPattern;
+ return this;
+ }
+
+ public void unsetSummarizerPattern() {
+ this.summarizerPattern = null;
+ }
+
+ /** Returns true if field summarizerPattern is set (has been assigned a value) and false otherwise */
+ public boolean isSetSummarizerPattern() {
+ return this.summarizerPattern != null;
+ }
+
+ public void setSummarizerPatternIsSet(boolean value) {
+ if (!value) {
+ this.summarizerPattern = null;
+ }
+ }
+
+ public void setFieldValue(_Fields field, java.lang.Object value) {
+ switch (field) {
+ case TABLE_ID:
+ if (value == null) {
+ unsetTableId();
+ } else {
+ setTableId((java.lang.String)value);
+ }
+ break;
+
+ case BOUNDS:
+ if (value == null) {
+ unsetBounds();
+ } else {
+ setBounds((TRowRange)value);
+ }
+ break;
+
+ case SUMMARIZERS:
+ if (value == null) {
+ unsetSummarizers();
+ } else {
+ setSummarizers((java.util.List<TSummarizerConfiguration>)value);
+ }
+ break;
+
+ case SUMMARIZER_PATTERN:
+ if (value == null) {
+ unsetSummarizerPattern();
+ } else {
+ setSummarizerPattern((java.lang.String)value);
+ }
+ break;
+
+ }
+ }
+
+ public java.lang.Object getFieldValue(_Fields field) {
+ switch (field) {
+ case TABLE_ID:
+ return getTableId();
+
+ case BOUNDS:
+ return getBounds();
+
+ case SUMMARIZERS:
+ return getSummarizers();
+
+ case SUMMARIZER_PATTERN:
+ return getSummarizerPattern();
+
+ }
+ throw new java.lang.IllegalStateException();
+ }
+
+ /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+ public boolean isSet(_Fields field) {
+ if (field == null) {
+ throw new java.lang.IllegalArgumentException();
+ }
+
+ switch (field) {
+ case TABLE_ID:
+ return isSetTableId();
+ case BOUNDS:
+ return isSetBounds();
+ case SUMMARIZERS:
+ return isSetSummarizers();
+ case SUMMARIZER_PATTERN:
+ return isSetSummarizerPattern();
+ }
+ throw new java.lang.IllegalStateException();
+ }
+
+ @Override
+ public boolean equals(java.lang.Object that) {
+ if (that == null)
+ return false;
+ if (that instanceof TSummaryRequest)
+ return this.equals((TSummaryRequest)that);
+ return false;
+ }
+
+ public boolean equals(TSummaryRequest that) {
+ if (that == null)
+ return false;
+ if (this == that)
+ return true;
+
+ boolean this_present_tableId = true && this.isSetTableId();
+ boolean that_present_tableId = true && that.isSetTableId();
+ if (this_present_tableId || that_present_tableId) {
+ if (!(this_present_tableId && that_present_tableId))
+ return false;
+ if (!this.tableId.equals(that.tableId))
+ return false;
+ }
+
+ boolean this_present_bounds = true && this.isSetBounds();
+ boolean that_present_bounds = true && that.isSetBounds();
+ if (this_present_bounds || that_present_bounds) {
+ if (!(this_present_bounds && that_present_bounds))
+ return false;
+ if (!this.bounds.equals(that.bounds))
+ return false;
+ }
+
+ boolean this_present_summarizers = true && this.isSetSummarizers();
+ boolean that_present_summarizers = true && that.isSetSummarizers();
+ if (this_present_summarizers || that_present_summarizers) {
+ if (!(this_present_summarizers && that_present_summarizers))
+ return false;
+ if (!this.summarizers.equals(that.summarizers))
+ return false;
+ }
+
+ boolean this_present_summarizerPattern = true && this.isSetSummarizerPattern();
+ boolean that_present_summarizerPattern = true && that.isSetSummarizerPattern();
+ if (this_present_summarizerPattern || that_present_summarizerPattern) {
+ if (!(this_present_summarizerPattern && that_present_summarizerPattern))
+ return false;
+ if (!this.summarizerPattern.equals(that.summarizerPattern))
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ int hashCode = 1;
+
+ hashCode = hashCode * 8191 + ((isSetTableId()) ? 131071 : 524287);
+ if (isSetTableId())
+ hashCode = hashCode * 8191 + tableId.hashCode();
+
+ hashCode = hashCode * 8191 + ((isSetBounds()) ? 131071 : 524287);
+ if (isSetBounds())
+ hashCode = hashCode * 8191 + bounds.hashCode();
+
+ hashCode = hashCode * 8191 + ((isSetSummarizers()) ? 131071 : 524287);
+ if (isSetSummarizers())
+ hashCode = hashCode * 8191 + summarizers.hashCode();
+
+ hashCode = hashCode * 8191 + ((isSetSummarizerPattern()) ? 131071 : 524287);
+ if (isSetSummarizerPattern())
+ hashCode = hashCode * 8191 + summarizerPattern.hashCode();
+
+ return hashCode;
+ }
+
+ @Override
+ public int compareTo(TSummaryRequest other) {
+ if (!getClass().equals(other.getClass())) {
+ return getClass().getName().compareTo(other.getClass().getName());
+ }
+
+ int lastComparison = 0;
+
+ lastComparison = java.lang.Boolean.valueOf(isSetTableId()).compareTo(other.isSetTableId());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetTableId()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.tableId, other.tableId);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ lastComparison = java.lang.Boolean.valueOf(isSetBounds()).compareTo(other.isSetBounds());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetBounds()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.bounds, other.bounds);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ lastComparison = java.lang.Boolean.valueOf(isSetSummarizers()).compareTo(other.isSetSummarizers());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetSummarizers()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.summarizers, other.summarizers);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ lastComparison = java.lang.Boolean.valueOf(isSetSummarizerPattern()).compareTo(other.isSetSummarizerPattern());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetSummarizerPattern()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.summarizerPattern, other.summarizerPattern);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ return 0;
+ }
+
+ public _Fields fieldForId(int fieldId) {
+ return _Fields.findByThriftId(fieldId);
+ }
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+ scheme(iprot).read(iprot, this);
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+ scheme(oprot).write(oprot, this);
+ }
+
+ @Override
+ public java.lang.String toString() {
+ java.lang.StringBuilder sb = new java.lang.StringBuilder("TSummaryRequest(");
+ boolean first = true;
+
+ sb.append("tableId:");
+ if (this.tableId == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.tableId);
+ }
+ first = false;
+ if (!first) sb.append(", ");
+ sb.append("bounds:");
+ if (this.bounds == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.bounds);
+ }
+ first = false;
+ if (!first) sb.append(", ");
+ sb.append("summarizers:");
+ if (this.summarizers == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.summarizers);
+ }
+ first = false;
+ if (!first) sb.append(", ");
+ sb.append("summarizerPattern:");
+ if (this.summarizerPattern == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.summarizerPattern);
+ }
+ first = false;
+ sb.append(")");
+ return sb.toString();
+ }
+
+ public void validate() throws org.apache.thrift.TException {
+ // check for required fields
+ // check for sub-struct validity
+ if (bounds != null) {
+ bounds.validate();
+ }
+ }
+
+ private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+ try {
+ write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, java.lang.ClassNotFoundException {
+ try {
+ read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private static class TSummaryRequestStandardSchemeFactory implements org.apache.thrift.scheme.SchemeFactory {
+ public TSummaryRequestStandardScheme getScheme() {
+ return new TSummaryRequestStandardScheme();
+ }
+ }
+
+ private static class TSummaryRequestStandardScheme extends org.apache.thrift.scheme.StandardScheme<TSummaryRequest> {
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot, TSummaryRequest struct) throws org.apache.thrift.TException {
+ org.apache.thrift.protocol.TField schemeField;
+ iprot.readStructBegin();
+ while (true)
+ {
+ schemeField = iprot.readFieldBegin();
+ if (schemeField.type == org.apache.thrift.protocol.TType.STOP) {
+ break;
+ }
+ switch (schemeField.id) {
+ case 1: // TABLE_ID
+ if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+ struct.tableId = iprot.readString();
+ struct.setTableIdIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
+ case 2: // BOUNDS
+ if (schemeField.type == org.apache.thrift.protocol.TType.STRUCT) {
+ struct.bounds = new TRowRange();
+ struct.bounds.read(iprot);
+ struct.setBoundsIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
+ case 3: // SUMMARIZERS
+ if (schemeField.type == org.apache.thrift.protocol.TType.LIST) {
+ {
+ org.apache.thrift.protocol.TList _list122 = iprot.readListBegin();
+ struct.summarizers = new java.util.ArrayList<TSummarizerConfiguration>(_list122.size);
+ TSummarizerConfiguration _elem123;
+ for (int _i124 = 0; _i124 < _list122.size; ++_i124)
+ {
+ _elem123 = new TSummarizerConfiguration();
+ _elem123.read(iprot);
+ struct.summarizers.add(_elem123);
+ }
+ iprot.readListEnd();
+ }
+ struct.setSummarizersIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
+ case 4: // SUMMARIZER_PATTERN
+ if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+ struct.summarizerPattern = iprot.readString();
+ struct.setSummarizerPatternIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
+ default:
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ iprot.readFieldEnd();
+ }
+ iprot.readStructEnd();
+
+ // check for required fields of primitive type, which can't be checked in the validate method
+ struct.validate();
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot, TSummaryRequest struct) throws org.apache.thrift.TException {
+ struct.validate();
+
+ oprot.writeStructBegin(STRUCT_DESC);
+ if (struct.tableId != null) {
+ oprot.writeFieldBegin(TABLE_ID_FIELD_DESC);
+ oprot.writeString(struct.tableId);
+ oprot.writeFieldEnd();
+ }
+ if (struct.bounds != null) {
+ oprot.writeFieldBegin(BOUNDS_FIELD_DESC);
+ struct.bounds.write(oprot);
+ oprot.writeFieldEnd();
+ }
+ if (struct.summarizers != null) {
+ oprot.writeFieldBegin(SUMMARIZERS_FIELD_DESC);
+ {
+ oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRUCT, struct.summarizers.size()));
+ for (TSummarizerConfiguration _iter125 : struct.summarizers)
+ {
+ _iter125.write(oprot);
+ }
+ oprot.writeListEnd();
+ }
+ oprot.writeFieldEnd();
+ }
+ if (struct.summarizerPattern != null) {
+ oprot.writeFieldBegin(SUMMARIZER_PATTERN_FIELD_DESC);
+ oprot.writeString(struct.summarizerPattern);
+ oprot.writeFieldEnd();
+ }
+ oprot.writeFieldStop();
+ oprot.writeStructEnd();
+ }
+
+ }
+
+ private static class TSummaryRequestTupleSchemeFactory implements org.apache.thrift.scheme.SchemeFactory {
+ public TSummaryRequestTupleScheme getScheme() {
+ return new TSummaryRequestTupleScheme();
+ }
+ }
+
+ private static class TSummaryRequestTupleScheme extends org.apache.thrift.scheme.TupleScheme<TSummaryRequest> {
+
+ @Override
+ public void write(org.apache.thrift.protocol.TProtocol prot, TSummaryRequest struct) throws org.apache.thrift.TException {
+ org.apache.thrift.protocol.TTupleProtocol oprot = (org.apache.thrift.protocol.TTupleProtocol) prot;
+ java.util.BitSet optionals = new java.util.BitSet();
+ if (struct.isSetTableId()) {
+ optionals.set(0);
+ }
+ if (struct.isSetBounds()) {
+ optionals.set(1);
+ }
+ if (struct.isSetSummarizers()) {
+ optionals.set(2);
+ }
+ if (struct.isSetSummarizerPattern()) {
+ optionals.set(3);
+ }
+ oprot.writeBitSet(optionals, 4);
+ if (struct.isSetTableId()) {
+ oprot.writeString(struct.tableId);
+ }
+ if (struct.isSetBounds()) {
+ struct.bounds.write(oprot);
+ }
+ if (struct.isSetSummarizers()) {
+ {
+ oprot.writeI32(struct.summarizers.size());
+ for (TSummarizerConfiguration _iter126 : struct.summarizers)
+ {
+ _iter126.write(oprot);
+ }
+ }
+ }
+ if (struct.isSetSummarizerPattern()) {
+ oprot.writeString(struct.summarizerPattern);
+ }
+ }
+
+ @Override
+ public void read(org.apache.thrift.protocol.TProtocol prot, TSummaryRequest struct) throws org.apache.thrift.TException {
+ org.apache.thrift.protocol.TTupleProtocol iprot = (org.apache.thrift.protocol.TTupleProtocol) prot;
+ java.util.BitSet incoming = iprot.readBitSet(4);
+ if (incoming.get(0)) {
+ struct.tableId = iprot.readString();
+ struct.setTableIdIsSet(true);
+ }
+ if (incoming.get(1)) {
+ struct.bounds = new TRowRange();
+ struct.bounds.read(iprot);
+ struct.setBoundsIsSet(true);
+ }
+ if (incoming.get(2)) {
+ {
+ org.apache.thrift.protocol.TList _list127 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRUCT, iprot.readI32());
+ struct.summarizers = new java.util.ArrayList<TSummarizerConfiguration>(_list127.size);
+ TSummarizerConfiguration _elem128;
+ for (int _i129 = 0; _i129 < _list127.size; ++_i129)
+ {
+ _elem128 = new TSummarizerConfiguration();
+ _elem128.read(iprot);
+ struct.summarizers.add(_elem128);
+ }
+ }
+ struct.setSummarizersIsSet(true);
+ }
+ if (incoming.get(3)) {
+ struct.summarizerPattern = iprot.readString();
+ struct.setSummarizerPatternIsSet(true);
+ }
+ }
+ }
+
+ private static <S extends org.apache.thrift.scheme.IScheme> S scheme(org.apache.thrift.protocol.TProtocol proto) {
+ return (org.apache.thrift.scheme.StandardScheme.class.equals(proto.getScheme()) ? STANDARD_SCHEME_FACTORY : TUPLE_SCHEME_FACTORY).getScheme();
+ }
+ private static void unusedMethod() {}
+}
+
http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/main/java/org/apache/accumulo/core/file/BloomFilterLayer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/BloomFilterLayer.java b/core/src/main/java/org/apache/accumulo/core/file/BloomFilterLayer.java
index 6e5728a..4808da3 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/BloomFilterLayer.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/BloomFilterLayer.java
@@ -97,12 +97,12 @@ public class BloomFilterLayer {
private boolean closed = false;
private long length = -1;
- Writer(FileSKVWriter writer, AccumuloConfiguration acuconf) {
+ Writer(FileSKVWriter writer, AccumuloConfiguration acuconf, boolean useAccumuloStart) {
this.writer = writer;
- initBloomFilter(acuconf);
+ initBloomFilter(acuconf, useAccumuloStart);
}
- private synchronized void initBloomFilter(AccumuloConfiguration acuconf) {
+ private synchronized void initBloomFilter(AccumuloConfiguration acuconf, boolean useAccumuloStart) {
numKeys = acuconf.getCount(Property.TABLE_BLOOM_SIZE);
// vector size should be <code>-kn / (ln(1 - c^(1/k)))</code> bits for
@@ -121,7 +121,9 @@ public class BloomFilterLayer {
String context = acuconf.get(Property.TABLE_CLASSPATH);
String classname = acuconf.get(Property.TABLE_BLOOM_KEY_FUNCTOR);
Class<? extends KeyFunctor> clazz;
- if (context != null && !context.equals(""))
+ if (!useAccumuloStart)
+ clazz = Writer.class.getClassLoader().loadClass(classname).asSubclass(KeyFunctor.class);
+ else if (context != null && !context.equals(""))
clazz = AccumuloVFSClassLoader.getContextManager().loadClass(context, classname, KeyFunctor.class);
else
clazz = AccumuloVFSClassLoader.loadClass(classname, KeyFunctor.class);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/main/java/org/apache/accumulo/core/file/DispatchingFileFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/DispatchingFileFactory.java b/core/src/main/java/org/apache/accumulo/core/file/DispatchingFileFactory.java
index c7d8248..e36b30f 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/DispatchingFileFactory.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/DispatchingFileFactory.java
@@ -23,6 +23,7 @@ import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.file.map.MapFileOperations;
import org.apache.accumulo.core.file.rfile.RFile;
import org.apache.accumulo.core.file.rfile.RFileOperations;
+import org.apache.accumulo.core.summary.SummaryWriter;
import org.apache.hadoop.fs.Path;
class DispatchingFileFactory extends FileOperations {
@@ -73,10 +74,10 @@ class DispatchingFileFactory extends FileOperations {
protected FileSKVWriter openWriter(OpenWriterOperation options) throws IOException {
FileSKVWriter writer = findFileFactory(options).openWriter(options);
if (options.getTableConfiguration().getBoolean(Property.TABLE_BLOOM_ENABLED)) {
- return new BloomFilterLayer.Writer(writer, options.getTableConfiguration());
- } else {
- return writer;
+ writer = new BloomFilterLayer.Writer(writer, options.getTableConfiguration(), options.isAccumuloStartEnabled());
}
+
+ return SummaryWriter.wrap(writer, options.getTableConfiguration(), options.isAccumuloStartEnabled());
}
@Override
http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java b/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java
index 10bb784..67757bc 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java
@@ -292,6 +292,7 @@ public abstract class FileOperations {
NeedsFileOrOuputStream<OpenWriterOperationBuilder> {
private String compression;
private FSDataOutputStream outputStream;
+ private boolean enableAccumuloStart = true;
@Override
public NeedsTableConfiguration<OpenWriterOperationBuilder> forOutputStream(String extenstion, FSDataOutputStream outputStream, Configuration fsConf) {
@@ -301,6 +302,16 @@ public abstract class FileOperations {
return this;
}
+ public boolean isAccumuloStartEnabled() {
+ return enableAccumuloStart;
+ }
+
+ @Override
+ public OpenWriterOperation setAccumuloStartEnabled(boolean enableAccumuloStart) {
+ this.enableAccumuloStart = enableAccumuloStart;
+ return this;
+ }
+
@Override
public OpenWriterOperation withCompression(String compression) {
this.compression = compression;
@@ -337,6 +348,13 @@ public abstract class FileOperations {
/** Set the compression type. */
public OpenWriterOperationBuilder withCompression(String compression);
+ /**
+ * Classes may be instantiated as part of a write operation. For example if BloomFilters, Samplers, or Summarizers are used then classes are loaded. When
+ * running in a tserver, Accumulo start should be used to load classes. When running in a client process, Accumulo start should not be used. This method
+ * makes it possible to specify if Accumulo Start should be used to load classes. Calling this method is optional and the default is true.
+ */
+ public OpenWriterOperationBuilder setAccumuloStartEnabled(boolean enableAccumuloStart);
+
/** Construct the writer. */
public FileSKVWriter build() throws IOException;
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/main/java/org/apache/accumulo/core/file/rfile/PrintInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/PrintInfo.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/PrintInfo.java
index 00ebb7a..d17528c 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/PrintInfo.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/PrintInfo.java
@@ -30,6 +30,7 @@ import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.file.FileSKVIterator;
import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile;
import org.apache.accumulo.core.file.rfile.RFile.Reader;
+import org.apache.accumulo.core.summary.SummaryReader;
import org.apache.accumulo.core.util.LocalityGroupUtil;
import org.apache.accumulo.start.spi.KeywordExecutable;
import org.apache.commons.math3.stat.descriptive.SummaryStatistics;
@@ -58,6 +59,8 @@ public class PrintInfo implements KeywordExecutable {
boolean histogram = false;
@Parameter(names = {"--useSample"}, description = "Use sample data for --dump, --vis, --histogram options")
boolean useSample = false;
+ @Parameter(names = {"--summary"}, description = "Print summary data in file")
+ boolean printSummary = false;
@Parameter(names = {"--keyStats"}, description = "print key length statistics for index and all data")
boolean keyStats = false;
@Parameter(description = " <file> { <file> ... }")
@@ -210,6 +213,10 @@ public class PrintInfo implements KeywordExecutable {
}
}
+ if (opts.printSummary) {
+ SummaryReader.print(iter, System.out);
+ }
+
iter.close();
if (opts.vis || opts.hash) {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
index 96d31ce..ec721ba 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
@@ -88,7 +88,7 @@ public class RFileOperations extends FileOperations {
Sampler sampler = null;
if (samplerConfig != null) {
- sampler = SamplerFactory.newSampler(samplerConfig, acuconf);
+ sampler = SamplerFactory.newSampler(samplerConfig, acuconf, options.isAccumuloStartEnabled());
}
String compression = options.getCompression();
http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataScanner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataScanner.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataScanner.java
new file mode 100644
index 0000000..7e92b64
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataScanner.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.metadata.schema;
+
+import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN;
+
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.IsolatedScanner;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.impl.ClientContext;
+import org.apache.accumulo.core.data.impl.KeyExtent;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.RootTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CurrentLocationColumnFamily;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.FutureLocationColumnFamily;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LastLocationColumnFamily;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata.FetchedColumns;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.ColumnFQ;
+import org.apache.hadoop.io.Text;
+
+import com.google.common.base.Preconditions;
+
+public class MetadataScanner {
+
+ public static interface SourceOptions {
+ TableOptions from(Scanner scanner);
+
+ TableOptions from(ClientContext ctx);
+ }
+
+ public static interface TableOptions {
+ ColumnOptions overRootTable();
+
+ ColumnOptions overMetadataTable();
+
+ ColumnOptions overUserTableId(String tableId);
+
+ ColumnOptions overUserTableId(String tableId, Text startRow, Text endRow);
+ }
+
+ public static interface ColumnOptions {
+ public ColumnOptions fetchFiles();
+
+ public ColumnOptions fetchLocation();
+
+ public ColumnOptions fetchPrev();
+
+ public ColumnOptions fetchLast();
+
+ public Iterable<TabletMetadata> build() throws TableNotFoundException, AccumuloException, AccumuloSecurityException;
+ }
+
+ private static class TabletMetadataIterator implements Iterator<TabletMetadata> {
+
+ private boolean sawLast = false;
+ private Iterator<TabletMetadata> iter;
+ private Text endRow;
+
+ TabletMetadataIterator(Iterator<TabletMetadata> source, Text endRow) {
+ this.iter = source;
+ this.endRow = endRow;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return !sawLast && iter.hasNext();
+ }
+
+ @Override
+ public TabletMetadata next() {
+ if (sawLast) {
+ throw new NoSuchElementException();
+ }
+ TabletMetadata next = iter.next();
+ if (next.getExtent().contains(endRow)) {
+ sawLast = true;
+ }
+ return next;
+ }
+ }
+
+ private static class Builder implements SourceOptions, TableOptions, ColumnOptions {
+
+ private List<Text> families = new ArrayList<>();
+ private List<ColumnFQ> qualifiers = new ArrayList<>();
+ private Scanner scanner;
+ private ClientContext ctx;
+ private String table;
+ private String userTableId;
+ private EnumSet<FetchedColumns> fetchedCols = EnumSet.noneOf(FetchedColumns.class);
+ private Text startRow;
+ private Text endRow;
+
+ @Override
+ public ColumnOptions fetchFiles() {
+ fetchedCols.add(FetchedColumns.FILES);
+ families.add(DataFileColumnFamily.NAME);
+ return this;
+ }
+
+ @Override
+ public ColumnOptions fetchLocation() {
+ fetchedCols.add(FetchedColumns.LOCATION);
+ families.add(CurrentLocationColumnFamily.NAME);
+ families.add(FutureLocationColumnFamily.NAME);
+ return this;
+ }
+
+ @Override
+ public ColumnOptions fetchPrev() {
+ fetchedCols.add(FetchedColumns.PREV_ROW);
+ qualifiers.add(PREV_ROW_COLUMN);
+ return this;
+ }
+
+ @Override
+ public ColumnOptions fetchLast() {
+ fetchedCols.add(FetchedColumns.LAST);
+ families.add(LastLocationColumnFamily.NAME);
+ return this;
+ }
+
+ @Override
+ public Iterable<TabletMetadata> build() throws TableNotFoundException, AccumuloException, AccumuloSecurityException {
+ if (ctx != null) {
+ scanner = new IsolatedScanner(ctx.getConnector().createScanner(table, Authorizations.EMPTY));
+ } else if (!(scanner instanceof IsolatedScanner)) {
+ scanner = new IsolatedScanner(scanner);
+ }
+
+ if (userTableId != null) {
+ scanner.setRange(new KeyExtent(userTableId, null, startRow).toMetadataRange());
+ }
+
+ for (Text fam : families) {
+ scanner.fetchColumnFamily(fam);
+ }
+
+ for (ColumnFQ col : qualifiers) {
+ col.fetch(scanner);
+ }
+
+ if (families.size() == 0 && qualifiers.size() == 0) {
+ fetchedCols = EnumSet.allOf(FetchedColumns.class);
+ }
+
+ Iterable<TabletMetadata> tmi = TabletMetadata.convert(scanner, fetchedCols);
+
+ if (endRow != null) {
+ // create an iterable that will stop at the tablet which contains the endRow
+ return new Iterable<TabletMetadata>() {
+ @Override
+ public Iterator<TabletMetadata> iterator() {
+ return new TabletMetadataIterator(tmi.iterator(), endRow);
+ }
+ };
+ } else {
+ return tmi;
+ }
+
+ }
+
+ @Override
+ public ColumnOptions overRootTable() {
+ this.table = RootTable.NAME;
+ return this;
+ }
+
+ @Override
+ public ColumnOptions overMetadataTable() {
+ this.table = MetadataTable.NAME;
+ return this;
+ }
+
+ @Override
+ public ColumnOptions overUserTableId(String tableId) {
+ Preconditions.checkArgument(!tableId.equals(RootTable.ID) && !tableId.equals(MetadataTable.ID));
+
+ this.table = MetadataTable.NAME;
+ this.userTableId = tableId;
+ return this;
+ }
+
+ @Override
+ public TableOptions from(Scanner scanner) {
+ this.scanner = scanner;
+ return this;
+ }
+
+ @Override
+ public TableOptions from(ClientContext ctx) {
+ this.ctx = ctx;
+ return this;
+ }
+
+ @Override
+ public ColumnOptions overUserTableId(String tableId, Text startRow, Text endRow) {
+ this.table = MetadataTable.NAME;
+ this.userTableId = tableId;
+ this.startRow = startRow;
+ this.endRow = endRow;
+ return this;
+ }
+
+ }
+
+ public static SourceOptions builder() {
+ return new Builder();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java
new file mode 100644
index 0000000..af5f814
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.metadata.schema;
+
+import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN;
+
+import java.util.EnumSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Objects;
+
+import org.apache.accumulo.core.client.RowIterator;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.data.impl.KeyExtent;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CurrentLocationColumnFamily;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.FutureLocationColumnFamily;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LastLocationColumnFamily;
+import org.apache.hadoop.io.Text;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableList.Builder;
+import com.google.common.collect.Iterators;
+import com.google.common.net.HostAndPort;
+
+public class TabletMetadata {
+
+ private String tableId;
+ private Text prevEndRow;
+ private Text endRow;
+ private Location location;
+ private List<String> files;
+ private EnumSet<FetchedColumns> fetchedColumns;
+ private KeyExtent extent;
+ private Location last;
+
+ public static enum LocationType {
+ CURRENT, FUTURE, LAST
+ }
+
+ public static enum FetchedColumns {
+ LOCATION, PREV_ROW, FILES, LAST
+ }
+
+ public static class Location {
+ private final String server;
+ private final String session;
+ private final LocationType lt;
+
+ Location(String server, String session, LocationType lt) {
+ this.server = server;
+ this.session = session;
+ this.lt = lt;
+ }
+
+ public HostAndPort getHostAndPort() {
+ return HostAndPort.fromString(server);
+ }
+
+ public String getSession() {
+ return session;
+ }
+
+ public LocationType getLocationType() {
+ return lt;
+ }
+ }
+
+ public String getTableId() {
+ return tableId;
+ }
+
+ public KeyExtent getExtent() {
+ if (extent == null) {
+ extent = new KeyExtent(getTableId(), getEndRow(), getPrevEndRow());
+ }
+ return extent;
+ }
+
+ public Text getPrevEndRow() {
+ Preconditions.checkState(fetchedColumns.contains(FetchedColumns.PREV_ROW), "Requested prev row when it was not fetched");
+ return prevEndRow;
+ }
+
+ public Text getEndRow() {
+ return endRow;
+ }
+
+ public Location getLocation() {
+ Preconditions.checkState(fetchedColumns.contains(FetchedColumns.LOCATION), "Requested location when it was not fetched");
+ return location;
+ }
+
+ public Location getLast() {
+ Preconditions.checkState(fetchedColumns.contains(FetchedColumns.LAST), "Requested last when it was not fetched");
+ return last;
+ }
+
+ public List<String> getFiles() {
+ Preconditions.checkState(fetchedColumns.contains(FetchedColumns.FILES), "Requested files when it was not fetched");
+ return files;
+ }
+
+ public static TabletMetadata convertRow(Iterator<Entry<Key,Value>> rowIter, EnumSet<FetchedColumns> fetchedColumns) {
+ Objects.requireNonNull(rowIter);
+
+ TabletMetadata te = new TabletMetadata();
+
+ Builder<String> filesBuilder = ImmutableList.builder();
+ ByteSequence row = null;
+
+ while (rowIter.hasNext()) {
+ Entry<Key,Value> kv = rowIter.next();
+ Key k = kv.getKey();
+ Value v = kv.getValue();
+ Text fam = k.getColumnFamily();
+
+ if (row == null) {
+ row = k.getRowData();
+ KeyExtent ke = new KeyExtent(k.getRow(), (Text) null);
+ te.endRow = ke.getEndRow();
+ te.tableId = ke.getTableId();
+ } else if (!row.equals(k.getRowData())) {
+ throw new IllegalArgumentException("Input contains more than one row : " + row + " " + k.getRowData());
+ }
+
+ if (PREV_ROW_COLUMN.hasColumns(k)) {
+ te.prevEndRow = KeyExtent.decodePrevEndRow(v);
+ }
+
+ if (fam.equals(DataFileColumnFamily.NAME)) {
+ filesBuilder.add(k.getColumnQualifier().toString());
+ } else if (fam.equals(CurrentLocationColumnFamily.NAME)) {
+ if (te.location != null) {
+ throw new IllegalArgumentException("Input contains more than one location " + te.location + " " + v);
+ }
+ te.location = new Location(v.toString(), k.getColumnQualifierData().toString(), LocationType.CURRENT);
+ } else if (fam.equals(FutureLocationColumnFamily.NAME)) {
+ if (te.location != null) {
+ throw new IllegalArgumentException("Input contains more than one location " + te.location + " " + v);
+ }
+ te.location = new Location(v.toString(), k.getColumnQualifierData().toString(), LocationType.FUTURE);
+ } else if (fam.equals(LastLocationColumnFamily.NAME)) {
+ te.last = new Location(v.toString(), k.getColumnQualifierData().toString(), LocationType.LAST);
+ }
+ }
+
+ te.files = filesBuilder.build();
+ te.fetchedColumns = fetchedColumns;
+ return te;
+ }
+
+ public static Iterable<TabletMetadata> convert(Scanner input, EnumSet<FetchedColumns> fetchedColumns) {
+ return new Iterable<TabletMetadata>() {
+ @Override
+ public Iterator<TabletMetadata> iterator() {
+ RowIterator rowIter = new RowIterator(input);
+ return Iterators.transform(rowIter, ri -> convertRow(ri, fetchedColumns));
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/main/java/org/apache/accumulo/core/sample/impl/SamplerConfigurationImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/sample/impl/SamplerConfigurationImpl.java b/core/src/main/java/org/apache/accumulo/core/sample/impl/SamplerConfigurationImpl.java
index d3e2fe7..7c622b0 100644
--- a/core/src/main/java/org/apache/accumulo/core/sample/impl/SamplerConfigurationImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/sample/impl/SamplerConfigurationImpl.java
@@ -17,8 +17,6 @@
package org.apache.accumulo.core.sample.impl;
-import static com.google.common.base.Preconditions.checkArgument;
-
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
@@ -171,16 +169,6 @@ public class SamplerConfigurationImpl implements Writable {
return className + " " + options;
}
- public static void checkDisjoint(Map<String,String> props, SamplerConfiguration samplerConfiguration) {
- if (props.isEmpty() || samplerConfiguration == null) {
- return;
- }
-
- Map<String,String> sampleProps = new SamplerConfigurationImpl(samplerConfiguration).toTablePropertiesMap();
-
- checkArgument(Collections.disjoint(props.keySet(), sampleProps.keySet()), "Properties and derived sampler properties are not disjoint");
- }
-
public static TSamplerConfiguration toThrift(SamplerConfiguration samplerConfig) {
if (samplerConfig == null)
return null;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/main/java/org/apache/accumulo/core/sample/impl/SamplerFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/sample/impl/SamplerFactory.java b/core/src/main/java/org/apache/accumulo/core/sample/impl/SamplerFactory.java
index d70f3af..0cf75ae 100644
--- a/core/src/main/java/org/apache/accumulo/core/sample/impl/SamplerFactory.java
+++ b/core/src/main/java/org/apache/accumulo/core/sample/impl/SamplerFactory.java
@@ -25,11 +25,13 @@ import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader;
public class SamplerFactory {
- public static Sampler newSampler(SamplerConfigurationImpl config, AccumuloConfiguration acuconf) throws IOException {
+ public static Sampler newSampler(SamplerConfigurationImpl config, AccumuloConfiguration acuconf, boolean useAccumuloStart) throws IOException {
String context = acuconf.get(Property.TABLE_CLASSPATH);
Class<? extends Sampler> clazz;
try {
+ if (!useAccumuloStart)
+ clazz = SamplerFactory.class.getClassLoader().loadClass(config.getClassName()).asSubclass(Sampler.class);
if (context != null && !context.equals(""))
clazz = AccumuloVFSClassLoader.getContextManager().loadClass(context, config.getClassName(), Sampler.class);
else
@@ -45,4 +47,8 @@ public class SamplerFactory {
throw new RuntimeException(e);
}
}
+
+ public static Sampler newSampler(SamplerConfigurationImpl config, AccumuloConfiguration acuconf) throws IOException {
+ return newSampler(config, acuconf, true);
+ }
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/main/java/org/apache/accumulo/core/security/TablePermission.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/security/TablePermission.java b/core/src/main/java/org/apache/accumulo/core/security/TablePermission.java
index a80be9a..d721ebc 100644
--- a/core/src/main/java/org/apache/accumulo/core/security/TablePermission.java
+++ b/core/src/main/java/org/apache/accumulo/core/security/TablePermission.java
@@ -33,11 +33,12 @@ public enum TablePermission {
BULK_IMPORT((byte) 4),
ALTER_TABLE((byte) 5),
GRANT((byte) 6),
- DROP_TABLE((byte) 7);
+ DROP_TABLE((byte) 7),
+ GET_SUMMARIES((byte) 8);
final private byte permID;
- final private static TablePermission mapping[] = new TablePermission[8];
+ final private static TablePermission mapping[] = new TablePermission[9];
static {
for (TablePermission perm : TablePermission.values())
mapping[perm.permID] = perm;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/main/java/org/apache/accumulo/core/summary/Gatherer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/summary/Gatherer.java b/core/src/main/java/org/apache/accumulo/core/summary/Gatherer.java
new file mode 100644
index 0000000..8389051
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/summary/Gatherer.java
@@ -0,0 +1,631 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.summary;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.impl.ClientContext;
+import org.apache.accumulo.core.client.impl.ServerClient;
+import org.apache.accumulo.core.client.summary.SummarizerConfiguration;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.impl.KeyExtent;
+import org.apache.accumulo.core.data.thrift.TRowRange;
+import org.apache.accumulo.core.data.thrift.TSummaries;
+import org.apache.accumulo.core.data.thrift.TSummaryRequest;
+import org.apache.accumulo.core.file.blockfile.cache.BlockCache;
+import org.apache.accumulo.core.metadata.schema.MetadataScanner;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.core.rpc.ThriftUtil;
+import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
+import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Client;
+import org.apache.accumulo.core.trace.Tracer;
+import org.apache.accumulo.core.trace.thrift.TInfo;
+import org.apache.accumulo.core.util.ByteBufferUtil;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.core.util.CancelFlagFuture;
+import org.apache.accumulo.core.util.CompletableFutureUtil;
+import org.apache.accumulo.core.util.TextUtil;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.thrift.TApplicationException;
+import org.apache.thrift.TException;
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.hash.Hashing;
+import com.google.common.net.HostAndPort;
+
+/**
+ * This class implements using multiple tservers to gather summaries.
+ *
+ * Below is a rough outline of the RPC process.
+ *
+ * <ol>
+ * <li>Clients pick a random tserver and make an RPC to remotely execute {@link #gather(ExecutorService)}.
+ * <li> {@link #gather(ExecutorService)} will call make RPC calls to multiple tservers to remotely execute {@link #processPartition(ExecutorService, int, int)}
+ * <li> {@link #processPartition(ExecutorService, int, int)} will make RPC calls to multiple tserver to remotely execute
+ * <li> {@link #processFiles(FileSystemResolver, Map, BlockCache, BlockCache, ExecutorService)}
+ * </ol>
+ */
+public class Gatherer {
+
+ private static final Logger log = LoggerFactory.getLogger(Gatherer.class);
+
+ private ClientContext ctx;
+ private String tableId;
+ private SummarizerFactory factory;
+ private Text startRow = null;
+ private Text endRow = null;
+ private Range clipRange;
+ private Predicate<SummarizerConfiguration> summarySelector;
+
+ private TSummaryRequest request;
+
+ private String summarizerPattern;
+
+ private Set<SummarizerConfiguration> summaries;
+
+ public Gatherer(ClientContext context, TSummaryRequest request, AccumuloConfiguration tableConfig) {
+ this.ctx = context;
+ this.tableId = request.tableId;
+ this.startRow = ByteBufferUtil.toText(request.bounds.startRow);
+ this.endRow = ByteBufferUtil.toText(request.bounds.endRow);
+ this.clipRange = new Range(startRow, false, endRow, true);
+ this.summaries = request.getSummarizers().stream().map(SummarizerConfigurationUtil::fromThrift).collect(Collectors.toSet());
+ this.request = request;
+
+ this.summarizerPattern = request.getSummarizerPattern();
+
+ if (summarizerPattern != null) {
+ Pattern pattern = Pattern.compile(summarizerPattern);
+ // The way conf is converted to string below is documented in the API, so consider this when making changes!
+ summarySelector = conf -> pattern.matcher(conf.getClassName() + " " + new TreeMap<>(conf.getOptions())).matches();
+ if (!summaries.isEmpty()) {
+ summarySelector = summarySelector.or(conf -> summaries.contains(conf));
+ }
+ } else if (!summaries.isEmpty()) {
+ summarySelector = conf -> summaries.contains(conf);
+ } else {
+ summarySelector = conf -> true;
+ }
+
+ this.factory = new SummarizerFactory(tableConfig);
+ }
+
+ private TSummaryRequest getRequest() {
+ return request;
+ }
+
+ /**
+ * @param fileSelector
+ * only returns files that match this predicate
+ * @return A map of the form : {@code map<tserver location, map<path, list<range>>} . The ranges associated with a file represent the tablets that use the
+ * file.
+ */
+ private Map<String,Map<String,List<TRowRange>>> getFilesGroupedByLocation(Predicate<String> fileSelector) throws TableNotFoundException, AccumuloException,
+ AccumuloSecurityException {
+
+ Iterable<TabletMetadata> tmi = MetadataScanner.builder().from(ctx).overUserTableId(tableId, startRow, endRow).fetchFiles().fetchLocation().fetchLast()
+ .fetchPrev().build();
+
+ // get a subset of files
+ Map<String,List<TabletMetadata>> files = new HashMap<>();
+ for (TabletMetadata tm : tmi) {
+ for (String file : tm.getFiles()) {
+ if (fileSelector.test(file)) {
+ // TODO push this filtering to server side and possibly use batch scanner
+ files.computeIfAbsent(file, s -> new ArrayList<>()).add(tm);
+ }
+ }
+ }
+
+ // group by location, then file
+
+ Map<String,Map<String,List<TRowRange>>> locations = new HashMap<>();
+
+ List<String> tservers = null;
+
+ for (Entry<String,List<TabletMetadata>> entry : files.entrySet()) {
+
+ String location = entry.getValue().stream().filter(tm -> tm.getLocation() != null) // filter tablets w/o a location
+ .map(tm -> tm.getLocation().getHostAndPort().toString()) // convert to host:port strings
+ .min(String::compareTo) // find minimum host:port
+ .orElse(entry.getValue().stream().filter(tm -> tm.getLast() != null) // if no locations, then look at last locations
+ .map(tm -> tm.getLast().getHostAndPort().toString()) // convert to host:port strings
+ .min(String::compareTo).orElse(null)); // find minimum last location or return null
+
+ if (location == null) {
+ if (tservers == null) {
+ tservers = ctx.getConnector().instanceOperations().getTabletServers();
+ Collections.sort(tservers);
+ }
+
+ // When no location, the approach below will consistently choose the same tserver for the same file (as long as the set of tservers is stable).
+ int idx = Math.abs(Hashing.murmur3_32().hashString(entry.getKey()).asInt()) % tservers.size();
+ location = tservers.get(idx);
+ }
+
+ List<Range> merged = Range.mergeOverlapping(Lists.transform(entry.getValue(), tm -> tm.getExtent().toDataRange())); // merge contiguous ranges
+ List<TRowRange> ranges = merged.stream().map(r -> toClippedExtent(r).toThrift()).collect(Collectors.toList()); // clip ranges to queried range
+
+ locations.computeIfAbsent(location, s -> new HashMap<>()).put(entry.getKey(), ranges);
+ }
+
+ return locations;
+ }
+
+ private <K,V> Iterable<Map<K,V>> partition(Map<K,V> map, int max) {
+
+ if (map.size() < max) {
+ return Collections.singletonList(map);
+ }
+
+ return new Iterable<Map<K,V>>() {
+ @Override
+ public Iterator<Map<K,V>> iterator() {
+ Iterator<Entry<K,V>> esi = map.entrySet().iterator();
+
+ return new Iterator<Map<K,V>>() {
+ @Override
+ public boolean hasNext() {
+ return esi.hasNext();
+ }
+
+ @Override
+ public Map<K,V> next() {
+ Map<K,V> workingMap = new HashMap<>(max);
+ while (esi.hasNext() && workingMap.size() < max) {
+ Entry<K,V> entry = esi.next();
+ workingMap.put(entry.getKey(), entry.getValue());
+ }
+ return workingMap;
+ }
+ };
+ }
+ };
+ }
+
+ private static class ProcessedFiles {
+ final SummaryCollection summaries;
+ final Set<String> failedFiles;
+
+ public ProcessedFiles() {
+ this.summaries = new SummaryCollection();
+ this.failedFiles = new HashSet<>();
+ }
+
+ public ProcessedFiles(SummaryCollection summaries, SummarizerFactory factory) {
+ this();
+ this.summaries.merge(summaries, factory);
+ }
+
+ static ProcessedFiles merge(ProcessedFiles pf1, ProcessedFiles pf2, SummarizerFactory factory) {
+ ProcessedFiles ret = new ProcessedFiles();
+ ret.failedFiles.addAll(pf1.failedFiles);
+ ret.failedFiles.addAll(pf2.failedFiles);
+ ret.summaries.merge(pf1.summaries, factory);
+ ret.summaries.merge(pf2.summaries, factory);
+ return ret;
+ }
+ }
+
+ private class FilesProcessor implements Supplier<ProcessedFiles> {
+
+ HostAndPort location;
+ Map<String,List<TRowRange>> allFiles;
+ private TInfo tinfo;
+ private AtomicBoolean cancelFlag;
+
+ public FilesProcessor(TInfo tinfo, HostAndPort location, Map<String,List<TRowRange>> allFiles, AtomicBoolean cancelFlag) {
+ this.location = location;
+ this.allFiles = allFiles;
+ this.tinfo = tinfo;
+ this.cancelFlag = cancelFlag;
+ }
+
+ @Override
+ public ProcessedFiles get() {
+ ProcessedFiles pfiles = new ProcessedFiles();
+
+ Client client = null;
+ try {
+ client = ThriftUtil.getTServerClient(location, ctx);
+ // partition files into smaller chunks so that not too many are sent to a tserver at once
+ for (Map<String,List<TRowRange>> files : partition(allFiles, 500)) {
+ if (pfiles.failedFiles.size() > 0) {
+ // there was a previous failure on this tserver, so just fail the rest of the files
+ pfiles.failedFiles.addAll(files.keySet());
+ continue;
+ }
+
+ try {
+ TSummaries tSums = client.startGetSummariesFromFiles(tinfo, ctx.rpcCreds(), getRequest(), files);
+ while (!tSums.finished && !cancelFlag.get()) {
+ tSums = client.contiuneGetSummaries(tinfo, tSums.sessionId);
+ }
+
+ pfiles.summaries.merge(new SummaryCollection(tSums), factory);
+ } catch (TApplicationException tae) {
+ throw new RuntimeException(tae);
+ } catch (TTransportException e) {
+ pfiles.failedFiles.addAll(files.keySet());
+ continue;
+ } catch (TException e) {
+ throw new RuntimeException(e);
+ }
+
+ }
+ } catch (TTransportException e1) {
+ pfiles.failedFiles.addAll(allFiles.keySet());
+ } finally {
+ ThriftUtil.returnClient(client);
+ }
+
+ if (cancelFlag.get()) {
+ throw new RuntimeException("Operation canceled");
+ }
+
+ return pfiles;
+ }
+ }
+
+ private class PartitionFuture implements Future<SummaryCollection> {
+
+ private CompletableFuture<ProcessedFiles> future;
+ private int modulus;
+ private int remainder;
+ private ExecutorService execSrv;
+ private TInfo tinfo;
+ private AtomicBoolean cancelFlag = new AtomicBoolean(false);
+
+ PartitionFuture(TInfo tinfo, ExecutorService execSrv, int modulus, int remainder) {
+ this.tinfo = tinfo;
+ this.execSrv = execSrv;
+ this.modulus = modulus;
+ this.remainder = remainder;
+ }
+
+ private synchronized void initiateProcessing(ProcessedFiles previousWork) {
+ try {
+ Predicate<String> fileSelector = file -> Math.abs(Hashing.murmur3_32().hashString(file).asInt()) % modulus == remainder;
+ if (previousWork != null) {
+ fileSelector = fileSelector.and(file -> previousWork.failedFiles.contains(file));
+ }
+ Map<String,Map<String,List<TRowRange>>> filesGBL;
+ filesGBL = getFilesGroupedByLocation(fileSelector);
+
+ List<CompletableFuture<ProcessedFiles>> futures = new ArrayList<>();
+ if (previousWork != null) {
+ futures.add(CompletableFuture.completedFuture(new ProcessedFiles(previousWork.summaries, factory)));
+ }
+
+ for (Entry<String,Map<String,List<TRowRange>>> entry : filesGBL.entrySet()) {
+ HostAndPort location = HostAndPort.fromString(entry.getKey());
+ Map<String,List<TRowRange>> allFiles = entry.getValue();
+
+ futures.add(CompletableFuture.supplyAsync(new FilesProcessor(tinfo, location, allFiles, cancelFlag), execSrv));
+ }
+
+ future = CompletableFutureUtil.merge(futures, (pf1, pf2) -> ProcessedFiles.merge(pf1, pf2, factory), ProcessedFiles::new);
+
+ // when all processing is done, check for failed files... and if found starting processing again
+ future.thenRun(() -> updateFuture());
+ } catch (Exception e) {
+ future = CompletableFuture.completedFuture(new ProcessedFiles());
+ // force future to have this exception
+ future.obtrudeException(e);
+ }
+ }
+
+ private ProcessedFiles _get() {
+ try {
+ return future.get();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ } catch (ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private synchronized CompletableFuture<ProcessedFiles> updateFuture() {
+ if (future.isDone()) {
+ if (!future.isCancelled() && !future.isCompletedExceptionally()) {
+ ProcessedFiles pf = _get();
+ if (pf.failedFiles.size() > 0) {
+ initiateProcessing(pf);
+ }
+ }
+ }
+
+ return future;
+ }
+
+ synchronized void initiateProcessing() {
+ Preconditions.checkState(future == null);
+ initiateProcessing(null);
+ }
+
+ @Override
+ public synchronized boolean cancel(boolean mayInterruptIfRunning) {
+ boolean canceled = future.cancel(mayInterruptIfRunning);
+ if (canceled) {
+ cancelFlag.set(true);
+ }
+ return canceled;
+ }
+
+ @Override
+ public synchronized boolean isCancelled() {
+ return future.isCancelled();
+ }
+
+ @Override
+ public synchronized boolean isDone() {
+ updateFuture();
+ if (future.isDone()) {
+ if (future.isCancelled() || future.isCompletedExceptionally()) {
+ return true;
+ }
+
+ ProcessedFiles pf = _get();
+ if (pf.failedFiles.size() == 0) {
+ return true;
+ } else {
+ updateFuture();
+ }
+ }
+
+ return false;
+ }
+
+ @Override
+ public SummaryCollection get() throws InterruptedException, ExecutionException {
+ CompletableFuture<ProcessedFiles> futureRef = updateFuture();
+ ProcessedFiles processedFiles = futureRef.get();
+ while (processedFiles.failedFiles.size() > 0) {
+ futureRef = updateFuture();
+ processedFiles = futureRef.get();
+ }
+ return processedFiles.summaries;
+ }
+
+ @Override
+ public SummaryCollection get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+ long nanosLeft = unit.toNanos(timeout);
+ long t1, t2;
+ CompletableFuture<ProcessedFiles> futureRef = updateFuture();
+ t1 = System.nanoTime();
+ ProcessedFiles processedFiles = futureRef.get(Long.max(1, nanosLeft), TimeUnit.NANOSECONDS);
+ t2 = System.nanoTime();
+ nanosLeft -= (t2 - t1);
+ while (processedFiles.failedFiles.size() > 0) {
+ futureRef = updateFuture();
+ t1 = System.nanoTime();
+ processedFiles = futureRef.get(Long.max(1, nanosLeft), TimeUnit.NANOSECONDS);
+ t2 = System.nanoTime();
+ nanosLeft -= (t2 - t1);
+ }
+ return processedFiles.summaries;
+ }
+
+ }
+
+ /**
+ * This methods reads a subset of file paths into memory and groups them by location. Then it request sumaries for files from each location/tablet server.
+ */
+ public Future<SummaryCollection> processPartition(ExecutorService execSrv, int modulus, int remainder) {
+ PartitionFuture future = new PartitionFuture(Tracer.traceInfo(), execSrv, modulus, remainder);
+ future.initiateProcessing();
+ return future;
+ }
+
+ public static interface FileSystemResolver {
+ FileSystem get(Path file);
+ }
+
+ /**
+ * This method will read summaries from a set of files.
+ */
+ public Future<SummaryCollection> processFiles(FileSystemResolver volMgr, Map<String,List<TRowRange>> files, BlockCache summaryCache, BlockCache indexCache,
+ ExecutorService srp) {
+ List<CompletableFuture<SummaryCollection>> futures = new ArrayList<>();
+ for (Entry<String,List<TRowRange>> entry : files.entrySet()) {
+ futures.add(CompletableFuture.supplyAsync(() -> {
+ List<RowRange> rrl = Lists.transform(entry.getValue(), RowRange::new);
+ return getSummaries(volMgr, entry.getKey(), rrl, summaryCache, indexCache);
+ }, srp));
+ }
+
+ return CompletableFutureUtil.merge(futures, (sc1, sc2) -> SummaryCollection.merge(sc1, sc2, factory), SummaryCollection::new);
+ }
+
+ private int countFiles() throws TableNotFoundException, AccumuloException, AccumuloSecurityException {
+ // TODO use a batch scanner + iterator to parallelize counting files
+ Iterable<TabletMetadata> tmi = MetadataScanner.builder().from(ctx).overUserTableId(tableId, startRow, endRow).fetchFiles().fetchPrev().build();
+ return (int) StreamSupport.stream(tmi.spliterator(), false).mapToInt(tm -> tm.getFiles().size()).sum();
+ }
+
+ private class GatherRequest implements Supplier<SummaryCollection> {
+
+ private int remainder;
+ private int modulus;
+ private TInfo tinfo;
+ private AtomicBoolean cancelFlag;
+
+ GatherRequest(TInfo tinfo, int remainder, int modulus, AtomicBoolean cancelFlag) {
+ this.remainder = remainder;
+ this.modulus = modulus;
+ this.tinfo = tinfo;
+ this.cancelFlag = cancelFlag;
+ }
+
+ @Override
+ public SummaryCollection get() {
+ TSummaryRequest req = getRequest();
+
+ TSummaries tSums;
+ try {
+ tSums = ServerClient.execute(ctx, new TabletClientService.Client.Factory(), client -> {
+ TSummaries tsr = client.startGetSummariesForPartition(tinfo, ctx.rpcCreds(), req, modulus, remainder);
+ while (!tsr.finished && !cancelFlag.get()) {
+ tsr = client.contiuneGetSummaries(tinfo, tsr.sessionId);
+ }
+ return tsr;
+ });
+ } catch (AccumuloException | AccumuloSecurityException e) {
+ throw new RuntimeException(e);
+ }
+
+ if (cancelFlag.get()) {
+ throw new RuntimeException("Operation canceled");
+ }
+
+ return new SummaryCollection(tSums);
+ }
+ }
+
+ public Future<SummaryCollection> gather(ExecutorService es) {
+ int numFiles;
+ try {
+ numFiles = countFiles();
+ } catch (TableNotFoundException | AccumuloException | AccumuloSecurityException e) {
+ throw new RuntimeException(e);
+ }
+
+ log.debug("Gathering summaries from {} files", numFiles);
+
+ if (numFiles == 0) {
+ return CompletableFuture.completedFuture(new SummaryCollection());
+ }
+
+ // have each tablet server process ~100K files
+ int numRequest = Math.max(numFiles / 100_000, 1);
+
+ List<CompletableFuture<SummaryCollection>> futures = new ArrayList<>();
+
+ AtomicBoolean cancelFlag = new AtomicBoolean(false);
+
+ TInfo tinfo = Tracer.traceInfo();
+ for (int i = 0; i < numRequest; i++) {
+ futures.add(CompletableFuture.supplyAsync(new GatherRequest(tinfo, i, numRequest, cancelFlag), es));
+ }
+
+ Future<SummaryCollection> future = CompletableFutureUtil.merge(futures, (sc1, sc2) -> SummaryCollection.merge(sc1, sc2, factory), SummaryCollection::new);
+ return new CancelFlagFuture<>(future, cancelFlag);
+ }
+
+ private static Text removeTrailingZeroFromRow(Key k) {
+ if (k != null) {
+ Text t = new Text();
+ ByteSequence row = k.getRowData();
+ Preconditions.checkArgument(row.length() >= 1 && row.byteAt(row.length() - 1) == 0);
+ t.set(row.getBackingArray(), row.offset(), row.length() - 1);
+ return t;
+ } else {
+ return null;
+ }
+ }
+
+ private RowRange toClippedExtent(Range r) {
+ r = clipRange.clip(r);
+
+ Text startRow = removeTrailingZeroFromRow(r.getStartKey());
+ Text endRow = removeTrailingZeroFromRow(r.getEndKey());
+
+ return new RowRange(startRow, endRow);
+ }
+
+ public static class RowRange {
+ private Text startRow;
+ private Text endRow;
+
+ public RowRange(KeyExtent ke) {
+ this.startRow = ke.getPrevEndRow();
+ this.endRow = ke.getEndRow();
+ }
+
+ public RowRange(TRowRange trr) {
+ this.startRow = ByteBufferUtil.toText(trr.startRow);
+ this.endRow = ByteBufferUtil.toText(trr.endRow);
+ }
+
+ public RowRange(Text startRow, Text endRow) {
+ this.startRow = startRow;
+ this.endRow = endRow;
+ }
+
+ public Range toRange() {
+ return new Range(startRow, false, endRow, true);
+ }
+
+ public TRowRange toThrift() {
+ return new TRowRange(TextUtil.getByteBuffer(startRow), TextUtil.getByteBuffer(endRow));
+ }
+
+ public Text getStartRow() {
+ return startRow;
+ }
+
+ public Text getEndRow() {
+ return endRow;
+ }
+
+ public String toString() {
+ return startRow + " " + endRow;
+ }
+ }
+
+ private SummaryCollection getSummaries(FileSystemResolver volMgr, String file, List<RowRange> ranges, BlockCache summaryCache, BlockCache indexCache) {
+ Path path = new Path(file);
+ Configuration conf = CachedConfiguration.getInstance();
+ return SummaryReader.load(volMgr.get(path), conf, ctx.getConfiguration(), factory, path, summarySelector, summaryCache, indexCache).getSummaries(ranges);
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/main/java/org/apache/accumulo/core/summary/SummarizerConfigurationUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/summary/SummarizerConfigurationUtil.java b/core/src/main/java/org/apache/accumulo/core/summary/SummarizerConfigurationUtil.java
new file mode 100644
index 0000000..a67b8c2
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/summary/SummarizerConfigurationUtil.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.summary;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import org.apache.accumulo.core.client.summary.SummarizerConfiguration;
+import org.apache.accumulo.core.client.summary.SummarizerConfiguration.Builder;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.thrift.TSummarizerConfiguration;
+
+public class SummarizerConfigurationUtil {
+
+ public static Map<String,String> toTablePropertiesMap(List<SummarizerConfiguration> summarizers) {
+ if (summarizers.size() == 0) {
+ return Collections.emptyMap();
+ }
+
+ Map<String,String> props = new HashMap<>();
+
+ for (SummarizerConfiguration sconf : summarizers) {
+ String cid = sconf.getPropertyId();
+ String prefix = Property.TABLE_SUMMARIZER_PREFIX.getKey() + cid;
+
+ if (props.containsKey(prefix)) {
+ throw new IllegalArgumentException("Duplicate summarizer config id : " + cid);
+ }
+
+ props.put(prefix, sconf.getClassName());
+ Set<Entry<String,String>> es = sconf.getOptions().entrySet();
+ StringBuilder sb = new StringBuilder(prefix + ".opt.");
+ int resetLen = sb.length();
+ for (Entry<String,String> entry : es) {
+ sb.append(entry.getKey());
+ props.put(sb.toString(), entry.getValue());
+ sb.setLength(resetLen);
+ }
+ }
+
+ return props;
+ }
+
+ public static List<SummarizerConfiguration> getSummarizerConfigs(Iterable<Entry<String,String>> props) {
+ TreeMap<String,String> filteredMap = new TreeMap<>();
+ for (Entry<String,String> entry : props) {
+ if (entry.getKey().startsWith(Property.TABLE_SUMMARIZER_PREFIX.getKey())) {
+ filteredMap.put(entry.getKey(), entry.getValue());
+ }
+ }
+
+ return getSummarizerConfigsFiltered(filteredMap);
+ }
+
+ public static List<SummarizerConfiguration> getSummarizerConfigs(AccumuloConfiguration aconf) {
+ Map<String,String> sprops = aconf.getAllPropertiesWithPrefix(Property.TABLE_SUMMARIZER_PREFIX);
+ return getSummarizerConfigsFiltered(new TreeMap<>(sprops));
+ }
+
+ private static List<SummarizerConfiguration> getSummarizerConfigsFiltered(SortedMap<String,String> sprops) {
+ if (sprops.size() == 0) {
+ return Collections.emptyList();
+ }
+
+ SummarizerConfiguration.Builder builder = null;
+
+ List<SummarizerConfiguration> configs = new ArrayList<>();
+
+ final int preLen = Property.TABLE_SUMMARIZER_PREFIX.getKey().length();
+ for (Entry<String,String> entry : sprops.entrySet()) {
+ String k = entry.getKey().substring(preLen);
+
+ String[] tokens = k.split("\\.");
+
+ String id = tokens[0];
+ if (tokens.length == 1) {
+ if (builder != null) {
+ configs.add(builder.build());
+ }
+
+ builder = SummarizerConfiguration.builder(entry.getValue()).setPropertyId(id);
+
+ } else if (tokens.length == 3 || tokens[1].equals("opt")) {
+ builder.addOption(tokens[2], entry.getValue());
+ } else {
+ throw new IllegalArgumentException("Unable to parse summarizer property : " + k);
+ }
+ }
+
+ configs.add(builder.build());
+
+ return configs;
+ }
+
+ public static TSummarizerConfiguration toThrift(SummarizerConfiguration sc) {
+ return new TSummarizerConfiguration(sc.getClassName(), sc.getOptions(), sc.getPropertyId());
+ }
+
+ public static SummarizerConfiguration fromThrift(TSummarizerConfiguration config) {
+ Builder builder = SummarizerConfiguration.builder(config.getClassname());
+ builder.setPropertyId(config.getConfigId());
+ builder.addOptions(config.getOptions());
+ return builder.build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/main/java/org/apache/accumulo/core/summary/SummarizerFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/summary/SummarizerFactory.java b/core/src/main/java/org/apache/accumulo/core/summary/SummarizerFactory.java
new file mode 100644
index 0000000..bba41b5
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/summary/SummarizerFactory.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.summary;
+
+import java.io.IOException;
+
+import org.apache.accumulo.core.client.summary.Summarizer;
+import org.apache.accumulo.core.client.summary.SummarizerConfiguration;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader;
+
+public class SummarizerFactory {
+ private ClassLoader classloader;
+ private String context;
+
+ public SummarizerFactory() {
+ this.classloader = SummarizerFactory.class.getClassLoader();
+ }
+
+ public SummarizerFactory(ClassLoader classloader) {
+ this.classloader = classloader;
+ }
+
+ public SummarizerFactory(AccumuloConfiguration tableConfig) {
+ this.context = tableConfig.get(Property.TABLE_CLASSPATH);
+ }
+
+ private Summarizer newSummarizer(String classname) throws ClassNotFoundException, IOException, InstantiationException, IllegalAccessException {
+ if (classloader != null) {
+ return classloader.loadClass(classname).asSubclass(Summarizer.class).newInstance();
+ } else {
+ if (context != null && !context.equals(""))
+ return AccumuloVFSClassLoader.getContextManager().loadClass(context, classname, Summarizer.class).newInstance();
+ else
+ return AccumuloVFSClassLoader.loadClass(classname, Summarizer.class).newInstance();
+ }
+ }
+
+ public Summarizer getSummarizer(SummarizerConfiguration conf) {
+ try {
+ Summarizer summarizer = newSummarizer(conf.getClassName());
+ return summarizer;
+ } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}