You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2012/10/22 20:55:18 UTC
[6/7] add describe_splits_ex providing improved split size estimate
patch by Piotr Kolaczkowski; reviewed by jbellis for CASSANDRA-4803
http://git-wip-us.apache.org/repos/asf/cassandra/blob/533bf3f6/interface/thrift/gen-java/org/apache/cassandra/thrift/CfSplit.java
----------------------------------------------------------------------
diff --git a/interface/thrift/gen-java/org/apache/cassandra/thrift/CfSplit.java b/interface/thrift/gen-java/org/apache/cassandra/thrift/CfSplit.java
new file mode 100644
index 0000000..2519f9f
--- /dev/null
+++ b/interface/thrift/gen-java/org/apache/cassandra/thrift/CfSplit.java
@@ -0,0 +1,549 @@
+/**
+ * Autogenerated by Thrift Compiler (0.7.0)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ */
+package org.apache.cassandra.thrift;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+import org.apache.commons.lang.builder.HashCodeBuilder;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.EnumMap;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.EnumSet;
+import java.util.Collections;
+import java.util.BitSet;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Represents input splits used by hadoop ColumnFamilyRecordReaders
+ */
+public class CfSplit implements org.apache.thrift.TBase<CfSplit, CfSplit._Fields>, java.io.Serializable, Cloneable {
+ private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("CfSplit");
+
+ private static final org.apache.thrift.protocol.TField START_TOKEN_FIELD_DESC = new org.apache.thrift.protocol.TField("start_token", org.apache.thrift.protocol.TType.STRING, (short)1);
+ private static final org.apache.thrift.protocol.TField END_TOKEN_FIELD_DESC = new org.apache.thrift.protocol.TField("end_token", org.apache.thrift.protocol.TType.STRING, (short)2);
+ private static final org.apache.thrift.protocol.TField ROW_COUNT_FIELD_DESC = new org.apache.thrift.protocol.TField("row_count", org.apache.thrift.protocol.TType.I64, (short)3);
+
+ public String start_token; // required
+ public String end_token; // required
+ public long row_count; // required
+
+ /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+ public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+ START_TOKEN((short)1, "start_token"),
+ END_TOKEN((short)2, "end_token"),
+ ROW_COUNT((short)3, "row_count");
+
+ private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+ static {
+ for (_Fields field : EnumSet.allOf(_Fields.class)) {
+ byName.put(field.getFieldName(), field);
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, or null if its not found.
+ */
+ public static _Fields findByThriftId(int fieldId) {
+ switch(fieldId) {
+ case 1: // START_TOKEN
+ return START_TOKEN;
+ case 2: // END_TOKEN
+ return END_TOKEN;
+ case 3: // ROW_COUNT
+ return ROW_COUNT;
+ default:
+ return null;
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, throwing an exception
+ * if it is not found.
+ */
+ public static _Fields findByThriftIdOrThrow(int fieldId) {
+ _Fields fields = findByThriftId(fieldId);
+ if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+ return fields;
+ }
+
+ /**
+ * Find the _Fields constant that matches name, or null if its not found.
+ */
+ public static _Fields findByName(String name) {
+ return byName.get(name);
+ }
+
+ private final short _thriftId;
+ private final String _fieldName;
+
+ _Fields(short thriftId, String fieldName) {
+ _thriftId = thriftId;
+ _fieldName = fieldName;
+ }
+
+ public short getThriftFieldId() {
+ return _thriftId;
+ }
+
+ public String getFieldName() {
+ return _fieldName;
+ }
+ }
+
+ // isset id assignments
+ private static final int __ROW_COUNT_ISSET_ID = 0;
+ private BitSet __isset_bit_vector = new BitSet(1);
+
+ public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+ static {
+ Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+ tmpMap.put(_Fields.START_TOKEN, new org.apache.thrift.meta_data.FieldMetaData("start_token", org.apache.thrift.TFieldRequirementType.REQUIRED,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+ tmpMap.put(_Fields.END_TOKEN, new org.apache.thrift.meta_data.FieldMetaData("end_token", org.apache.thrift.TFieldRequirementType.REQUIRED,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+ tmpMap.put(_Fields.ROW_COUNT, new org.apache.thrift.meta_data.FieldMetaData("row_count", org.apache.thrift.TFieldRequirementType.REQUIRED,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64)));
+ metaDataMap = Collections.unmodifiableMap(tmpMap);
+ org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(CfSplit.class, metaDataMap);
+ }
+
+ public CfSplit() {
+ }
+
+ public CfSplit(
+ String start_token,
+ String end_token,
+ long row_count)
+ {
+ this();
+ this.start_token = start_token;
+ this.end_token = end_token;
+ this.row_count = row_count;
+ setRow_countIsSet(true);
+ }
+
+ /**
+ * Performs a deep copy on <i>other</i>.
+ */
+ public CfSplit(CfSplit other) {
+ __isset_bit_vector.clear();
+ __isset_bit_vector.or(other.__isset_bit_vector);
+ if (other.isSetStart_token()) {
+ this.start_token = other.start_token;
+ }
+ if (other.isSetEnd_token()) {
+ this.end_token = other.end_token;
+ }
+ this.row_count = other.row_count;
+ }
+
+ public CfSplit deepCopy() {
+ return new CfSplit(this);
+ }
+
+ @Override
+ public void clear() {
+ this.start_token = null;
+ this.end_token = null;
+ setRow_countIsSet(false);
+ this.row_count = 0;
+ }
+
+ public String getStart_token() {
+ return this.start_token;
+ }
+
+ public CfSplit setStart_token(String start_token) {
+ this.start_token = start_token;
+ return this;
+ }
+
+ public void unsetStart_token() {
+ this.start_token = null;
+ }
+
+ /** Returns true if field start_token is set (has been assigned a value) and false otherwise */
+ public boolean isSetStart_token() {
+ return this.start_token != null;
+ }
+
+ public void setStart_tokenIsSet(boolean value) {
+ if (!value) {
+ this.start_token = null;
+ }
+ }
+
+ public String getEnd_token() {
+ return this.end_token;
+ }
+
+ public CfSplit setEnd_token(String end_token) {
+ this.end_token = end_token;
+ return this;
+ }
+
+ public void unsetEnd_token() {
+ this.end_token = null;
+ }
+
+ /** Returns true if field end_token is set (has been assigned a value) and false otherwise */
+ public boolean isSetEnd_token() {
+ return this.end_token != null;
+ }
+
+ public void setEnd_tokenIsSet(boolean value) {
+ if (!value) {
+ this.end_token = null;
+ }
+ }
+
+ public long getRow_count() {
+ return this.row_count;
+ }
+
+ public CfSplit setRow_count(long row_count) {
+ this.row_count = row_count;
+ setRow_countIsSet(true);
+ return this;
+ }
+
+ public void unsetRow_count() {
+ __isset_bit_vector.clear(__ROW_COUNT_ISSET_ID);
+ }
+
+ /** Returns true if field row_count is set (has been assigned a value) and false otherwise */
+ public boolean isSetRow_count() {
+ return __isset_bit_vector.get(__ROW_COUNT_ISSET_ID);
+ }
+
+ public void setRow_countIsSet(boolean value) {
+ __isset_bit_vector.set(__ROW_COUNT_ISSET_ID, value);
+ }
+
+ public void setFieldValue(_Fields field, Object value) {
+ switch (field) {
+ case START_TOKEN:
+ if (value == null) {
+ unsetStart_token();
+ } else {
+ setStart_token((String)value);
+ }
+ break;
+
+ case END_TOKEN:
+ if (value == null) {
+ unsetEnd_token();
+ } else {
+ setEnd_token((String)value);
+ }
+ break;
+
+ case ROW_COUNT:
+ if (value == null) {
+ unsetRow_count();
+ } else {
+ setRow_count((Long)value);
+ }
+ break;
+
+ }
+ }
+
+ public Object getFieldValue(_Fields field) {
+ switch (field) {
+ case START_TOKEN:
+ return getStart_token();
+
+ case END_TOKEN:
+ return getEnd_token();
+
+ case ROW_COUNT:
+ return Long.valueOf(getRow_count());
+
+ }
+ throw new IllegalStateException();
+ }
+
+ /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+ public boolean isSet(_Fields field) {
+ if (field == null) {
+ throw new IllegalArgumentException();
+ }
+
+ switch (field) {
+ case START_TOKEN:
+ return isSetStart_token();
+ case END_TOKEN:
+ return isSetEnd_token();
+ case ROW_COUNT:
+ return isSetRow_count();
+ }
+ throw new IllegalStateException();
+ }
+
+ @Override
+ public boolean equals(Object that) {
+ if (that == null)
+ return false;
+ if (that instanceof CfSplit)
+ return this.equals((CfSplit)that);
+ return false;
+ }
+
+ public boolean equals(CfSplit that) {
+ if (that == null)
+ return false;
+
+ boolean this_present_start_token = true && this.isSetStart_token();
+ boolean that_present_start_token = true && that.isSetStart_token();
+ if (this_present_start_token || that_present_start_token) {
+ if (!(this_present_start_token && that_present_start_token))
+ return false;
+ if (!this.start_token.equals(that.start_token))
+ return false;
+ }
+
+ boolean this_present_end_token = true && this.isSetEnd_token();
+ boolean that_present_end_token = true && that.isSetEnd_token();
+ if (this_present_end_token || that_present_end_token) {
+ if (!(this_present_end_token && that_present_end_token))
+ return false;
+ if (!this.end_token.equals(that.end_token))
+ return false;
+ }
+
+ boolean this_present_row_count = true;
+ boolean that_present_row_count = true;
+ if (this_present_row_count || that_present_row_count) {
+ if (!(this_present_row_count && that_present_row_count))
+ return false;
+ if (this.row_count != that.row_count)
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ HashCodeBuilder builder = new HashCodeBuilder();
+
+ boolean present_start_token = true && (isSetStart_token());
+ builder.append(present_start_token);
+ if (present_start_token)
+ builder.append(start_token);
+
+ boolean present_end_token = true && (isSetEnd_token());
+ builder.append(present_end_token);
+ if (present_end_token)
+ builder.append(end_token);
+
+ boolean present_row_count = true;
+ builder.append(present_row_count);
+ if (present_row_count)
+ builder.append(row_count);
+
+ return builder.toHashCode();
+ }
+
+ public int compareTo(CfSplit other) {
+ if (!getClass().equals(other.getClass())) {
+ return getClass().getName().compareTo(other.getClass().getName());
+ }
+
+ int lastComparison = 0;
+ CfSplit typedOther = (CfSplit)other;
+
+ lastComparison = Boolean.valueOf(isSetStart_token()).compareTo(typedOther.isSetStart_token());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetStart_token()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.start_token, typedOther.start_token);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ lastComparison = Boolean.valueOf(isSetEnd_token()).compareTo(typedOther.isSetEnd_token());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetEnd_token()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.end_token, typedOther.end_token);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ lastComparison = Boolean.valueOf(isSetRow_count()).compareTo(typedOther.isSetRow_count());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetRow_count()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.row_count, typedOther.row_count);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ return 0;
+ }
+
+ public _Fields fieldForId(int fieldId) {
+ return _Fields.findByThriftId(fieldId);
+ }
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+ org.apache.thrift.protocol.TField field;
+ iprot.readStructBegin();
+ while (true)
+ {
+ field = iprot.readFieldBegin();
+ if (field.type == org.apache.thrift.protocol.TType.STOP) {
+ break;
+ }
+ switch (field.id) {
+ case 1: // START_TOKEN
+ if (field.type == org.apache.thrift.protocol.TType.STRING) {
+ this.start_token = iprot.readString();
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
+ }
+ break;
+ case 2: // END_TOKEN
+ if (field.type == org.apache.thrift.protocol.TType.STRING) {
+ this.end_token = iprot.readString();
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
+ }
+ break;
+ case 3: // ROW_COUNT
+ if (field.type == org.apache.thrift.protocol.TType.I64) {
+ this.row_count = iprot.readI64();
+ setRow_countIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
+ }
+ break;
+ default:
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
+ }
+ iprot.readFieldEnd();
+ }
+ iprot.readStructEnd();
+
+ // check for required fields of primitive type, which can't be checked in the validate method
+ if (!isSetRow_count()) {
+ throw new org.apache.thrift.protocol.TProtocolException("Required field 'row_count' was not found in serialized data! Struct: " + toString());
+ }
+ validate();
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+ validate();
+
+ oprot.writeStructBegin(STRUCT_DESC);
+ if (this.start_token != null) {
+ oprot.writeFieldBegin(START_TOKEN_FIELD_DESC);
+ oprot.writeString(this.start_token);
+ oprot.writeFieldEnd();
+ }
+ if (this.end_token != null) {
+ oprot.writeFieldBegin(END_TOKEN_FIELD_DESC);
+ oprot.writeString(this.end_token);
+ oprot.writeFieldEnd();
+ }
+ oprot.writeFieldBegin(ROW_COUNT_FIELD_DESC);
+ oprot.writeI64(this.row_count);
+ oprot.writeFieldEnd();
+ oprot.writeFieldStop();
+ oprot.writeStructEnd();
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("CfSplit(");
+ boolean first = true;
+
+ sb.append("start_token:");
+ if (this.start_token == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.start_token);
+ }
+ first = false;
+ if (!first) sb.append(", ");
+ sb.append("end_token:");
+ if (this.end_token == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.end_token);
+ }
+ first = false;
+ if (!first) sb.append(", ");
+ sb.append("row_count:");
+ sb.append(this.row_count);
+ first = false;
+ sb.append(")");
+ return sb.toString();
+ }
+
+ public void validate() throws org.apache.thrift.TException {
+ // check for required fields
+ if (start_token == null) {
+ throw new org.apache.thrift.protocol.TProtocolException("Required field 'start_token' was not present! Struct: " + toString());
+ }
+ if (end_token == null) {
+ throw new org.apache.thrift.protocol.TProtocolException("Required field 'end_token' was not present! Struct: " + toString());
+ }
+ // alas, we cannot check 'row_count' because it's a primitive and you chose the non-beans generator.
+ }
+
+ private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+ try {
+ write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
+ try {
+ // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor.
+ __isset_bit_vector = new BitSet(1);
+ read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/cassandra/blob/533bf3f6/interface/thrift/gen-java/org/apache/cassandra/thrift/Constants.java
----------------------------------------------------------------------
diff --git a/interface/thrift/gen-java/org/apache/cassandra/thrift/Constants.java b/interface/thrift/gen-java/org/apache/cassandra/thrift/Constants.java
index 7e183c7..9d0701f 100644
--- a/interface/thrift/gen-java/org/apache/cassandra/thrift/Constants.java
+++ b/interface/thrift/gen-java/org/apache/cassandra/thrift/Constants.java
@@ -44,6 +44,6 @@ import org.slf4j.LoggerFactory;
public class Constants {
- public static final String VERSION = "19.32.0";
+ public static final String VERSION = "19.33.0";
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/533bf3f6/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
index cb79b01..c4c6570 100644
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
@@ -35,6 +35,9 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import com.google.common.collect.ImmutableList;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.IColumn;
import org.apache.cassandra.dht.IPartitioner;
@@ -44,18 +47,11 @@ import org.apache.cassandra.thrift.Cassandra;
import org.apache.cassandra.thrift.InvalidRequestException;
import org.apache.cassandra.thrift.KeyRange;
import org.apache.cassandra.thrift.TokenRange;
-import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapred.*;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.*;
import org.apache.thrift.TException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* Hadoop InputFormat allowing map/reduce against Cassandra rows within one ColumnFamily.
@@ -208,7 +204,7 @@ public class ColumnFamilyInputFormat extends InputFormat<ByteBuffer, SortedMap<B
public List<InputSplit> call() throws Exception
{
ArrayList<InputSplit> splits = new ArrayList<InputSplit>();
- List<String> tokens = getSubSplits(keyspace, cfName, range, conf);
+ List<CfSplit> subSplits = getSubSplits(keyspace, cfName, range, conf);
assert range.rpc_endpoints.size() == range.endpoints.size() : "rpc_endpoints size must match endpoints size";
// turn the sub-ranges into InputSplits
String[] endpoints = range.endpoints.toArray(new String[range.endpoints.size()]);
@@ -223,15 +219,21 @@ public class ColumnFamilyInputFormat extends InputFormat<ByteBuffer, SortedMap<B
}
Token.TokenFactory factory = partitioner.getTokenFactory();
- for (int i = 1; i < tokens.size(); i++)
+ for (CfSplit subSplit : subSplits)
{
- Token left = factory.fromString(tokens.get(i - 1));
- Token right = factory.fromString(tokens.get(i));
+ Token left = factory.fromString(subSplit.getStart_token());
+ Token right = factory.fromString(subSplit.getEnd_token());
Range<Token> range = new Range<Token>(left, right, partitioner);
List<Range<Token>> ranges = range.isWrapAround() ? range.unwrap() : ImmutableList.of(range);
for (Range<Token> subrange : ranges)
{
- ColumnFamilySplit split = new ColumnFamilySplit(factory.toString(subrange.left), factory.toString(subrange.right), endpoints);
+ ColumnFamilySplit split =
+ new ColumnFamilySplit(
+ factory.toString(subrange.left),
+ factory.toString(subrange.right),
+ subSplit.getRow_count(),
+ endpoints);
+
logger.debug("adding " + split);
splits.add(split);
}
@@ -240,7 +242,7 @@ public class ColumnFamilyInputFormat extends InputFormat<ByteBuffer, SortedMap<B
}
}
- private List<String> getSubSplits(String keyspace, String cfName, TokenRange range, Configuration conf) throws IOException
+ private List<CfSplit> getSubSplits(String keyspace, String cfName, TokenRange range, Configuration conf) throws IOException
{
int splitsize = ConfigHelper.getInputSplitSize(conf);
for (int i = 0; i < range.rpc_endpoints.size(); i++)
@@ -254,7 +256,7 @@ public class ColumnFamilyInputFormat extends InputFormat<ByteBuffer, SortedMap<B
{
Cassandra.Client client = ConfigHelper.createConnection(conf, host, ConfigHelper.getInputRpcPort(conf));
client.set_keyspace(keyspace);
- return client.describe_splits(cfName, range.start_token, range.end_token, splitsize);
+ return client.describe_splits_ex(cfName, range.start_token, range.end_token, splitsize);
}
catch (IOException e)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/533bf3f6/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
index 73f9786..c662932 100644
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
+++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
@@ -145,12 +145,11 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
predicate = ConfigHelper.getInputSlicePredicate(conf);
boolean widerows = ConfigHelper.getInputIsWide(conf);
isEmptyPredicate = isEmptyPredicate(predicate);
- totalRowCount = ConfigHelper.getInputSplitSize(conf);
+ totalRowCount = (int) this.split.getLength();
batchSize = ConfigHelper.getRangeBatchSize(conf);
cfName = ConfigHelper.getInputColumnFamily(conf);
consistencyLevel = ConsistencyLevel.valueOf(ConfigHelper.getReadConsistencyLevel(conf));
-
keyspace = ConfigHelper.getInputKeyspace(conf);
try
@@ -189,7 +188,11 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
public boolean nextKeyValue() throws IOException
{
if (!iter.hasNext())
+ {
+ logger.debug("Finished scanning " + iter.rowsRead() + " rows (estimate was: " + totalRowCount + ")");
return false;
+ }
+
currentRow = iter.next();
return true;
}
@@ -482,7 +485,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
Pair<ByteBuffer, SortedMap<ByteBuffer, IColumn>> next = wideColumns.next();
lastColumn = next.right.values().iterator().next().name();
- maybeCountRow(next);
+ maybeIncreaseRowCounter(next);
return next;
}
@@ -491,7 +494,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
* Increases the row counter only if we really moved to the next row.
* @param next just fetched row slice
*/
- private void maybeCountRow(Pair<ByteBuffer, SortedMap<ByteBuffer, IColumn>> next)
+ private void maybeIncreaseRowCounter(Pair<ByteBuffer, SortedMap<ByteBuffer, IColumn>> next)
{
ByteBuffer currentKey = next.left;
if (!currentKey.equals(lastCountedKey))
http://git-wip-us.apache.org/repos/asf/cassandra/blob/533bf3f6/src/java/org/apache/cassandra/hadoop/ColumnFamilySplit.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilySplit.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilySplit.java
index bd2e487..4085c68 100644
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilySplit.java
+++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilySplit.java
@@ -33,14 +33,22 @@ public class ColumnFamilySplit extends InputSplit implements Writable, org.apach
{
private String startToken;
private String endToken;
+ private long length;
private String[] dataNodes;
+ @Deprecated
public ColumnFamilySplit(String startToken, String endToken, String[] dataNodes)
{
+ this(startToken, endToken, Long.MAX_VALUE, dataNodes);
+ }
+
+ public ColumnFamilySplit(String startToken, String endToken, long length, String[] dataNodes)
+ {
assert startToken != null;
assert endToken != null;
this.startToken = startToken;
this.endToken = endToken;
+ this.length = length;
this.dataNodes = dataNodes;
}
@@ -58,8 +66,7 @@ public class ColumnFamilySplit extends InputSplit implements Writable, org.apach
public long getLength()
{
- // only used for sorting splits. we don't have the capability, yet.
- return Long.MAX_VALUE;
+ return length;
}
public String[] getLocations()
@@ -76,7 +83,7 @@ public class ColumnFamilySplit extends InputSplit implements Writable, org.apach
{
out.writeUTF(startToken);
out.writeUTF(endToken);
-
+ out.writeLong(length);
out.writeInt(dataNodes.length);
for (String endpoint : dataNodes)
{
@@ -88,6 +95,7 @@ public class ColumnFamilySplit extends InputSplit implements Writable, org.apach
{
startToken = in.readUTF();
endToken = in.readUTF();
+ length = in.readLong();
int numOfEndpoints = in.readInt();
dataNodes = new String[numOfEndpoints];
http://git-wip-us.apache.org/repos/asf/cassandra/blob/533bf3f6/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index b1eaa1e..80c3f46 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -36,6 +36,7 @@ import com.google.common.base.Supplier;
import com.google.common.collect.*;
import org.apache.cassandra.metrics.ClientRequestMetrics;
+
import org.apache.log4j.Level;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
@@ -2184,28 +2185,50 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
}
/**
- * @return list of Tokens (_not_ keys!) breaking up the data this node is responsible for into pieces of roughly keysPerSplit
+ * @return list of Token ranges (_not_ keys!) together with estimated key count,
+ * breaking up the data this node is responsible for into pieces of roughly keysPerSplit
*/
- public List<Token> getSplits(String table, String cfName, Range<Token> range, int keysPerSplit)
+ public List<Pair<Range<Token>, Long>> getSplits(String table, String cfName, Range<Token> range, int keysPerSplit)
{
- List<Token> tokens = new ArrayList<Token>();
- // we use the actual Range token for the first and last brackets of the splits to ensure correctness
- tokens.add(range.left);
-
Table t = Table.open(table);
ColumnFamilyStore cfs = t.getColumnFamilyStore(cfName);
List<DecoratedKey> keys = keySamples(Collections.singleton(cfs), range);
- int splits = keys.size() * DatabaseDescriptor.getIndexInterval() / keysPerSplit;
- if (keys.size() >= splits)
+ final long totalRowCountEstimate = (keys.size() + 1) * DatabaseDescriptor.getIndexInterval();
+
+ // splitCount should be much smaller than number of key samples, to avoid huge sampling error
+ final int minSamplesPerSplit = 4;
+ final int maxSplitCount = keys.size() / minSamplesPerSplit + 1;
+ final int splitCount = Math.max(1, Math.min(maxSplitCount, (int)(totalRowCountEstimate / keysPerSplit)));
+
+ List<Token> tokens = keysToTokens(range, keys);
+ return getSplits(tokens, splitCount);
+ }
+
+ private List<Pair<Range<Token>, Long>> getSplits(List<Token> tokens, int splitCount)
+ {
+ final double step = (double) (tokens.size() - 1) / splitCount;
+ int prevIndex = 0;
+ Token prevToken = tokens.get(0);
+ List<Pair<Range<Token>, Long>> splits = Lists.newArrayListWithExpectedSize(splitCount);
+ for (int i = 1; i <= splitCount; i++)
{
- for (int i = 1; i < splits; i++)
- {
- int index = i * (keys.size() / splits);
- tokens.add(keys.get(index).token);
- }
+ int index = (int) Math.round(i * step);
+ Token token = tokens.get(index);
+ long rowCountEstimate = (index - prevIndex) * DatabaseDescriptor.getIndexInterval();
+ splits.add(Pair.create(new Range<Token>(prevToken, token), rowCountEstimate));
+ prevIndex = index;
+ prevToken = token;
}
+ return splits;
+ }
+ private List<Token> keysToTokens(Range<Token> range, List<DecoratedKey> keys)
+ {
+ List<Token> tokens = Lists.newArrayListWithExpectedSize(keys.size() + 2);
+ tokens.add(range.left);
+ for (DecoratedKey key : keys)
+ tokens.add(key.token);
tokens.add(range.right);
return tokens;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/533bf3f6/src/java/org/apache/cassandra/thrift/CassandraServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java
index ad416f3..3bf155e 100644
--- a/src/java/org/apache/cassandra/thrift/CassandraServer.java
+++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java
@@ -30,6 +30,8 @@ import java.util.zip.Inflater;
import com.google.common.base.Predicates;
import com.google.common.collect.Maps;
+import org.apache.cassandra.hadoop.ColumnFamilySplit;
+import org.apache.cassandra.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -882,18 +884,33 @@ public class CassandraServer implements Cassandra.Iface
return DatabaseDescriptor.getEndpointSnitch().getClass().getName();
}
+ @Deprecated
public List<String> describe_splits(String cfName, String start_token, String end_token, int keys_per_split)
throws TException, InvalidRequestException
{
+ List<CfSplit> splits = describe_splits_ex(cfName, start_token, end_token, keys_per_split);
+ List<String> result = new ArrayList<String>(splits.size() + 1);
+
+ result.add(splits.get(0).getStart_token());
+ for (CfSplit cfSplit : splits)
+ result.add(cfSplit.getEnd_token());
+
+ return result;
+ }
+
+ @Override
+ public List<CfSplit> describe_splits_ex(String cfName, String start_token, String end_token, int keys_per_split)
+ throws InvalidRequestException, TException
+ {
// TODO: add keyspace authorization call post CASSANDRA-1425
Token.TokenFactory tf = StorageService.getPartitioner().getTokenFactory();
- List<Token> tokens = StorageService.instance.getSplits(state().getKeyspace(), cfName, new Range<Token>(tf.fromString(start_token), tf.fromString(end_token)), keys_per_split);
- List<String> splits = new ArrayList<String>(tokens.size());
- for (Token token : tokens)
- {
- splits.add(tf.toString(token));
- }
- return splits;
+ Range<Token> tr = new Range<Token>(tf.fromString(start_token), tf.fromString(end_token));
+ List<Pair<Range<Token>, Long>> splits =
+ StorageService.instance.getSplits(state().getKeyspace(), cfName, tr, keys_per_split);
+ List<CfSplit> result = new ArrayList<CfSplit>(splits.size());
+ for (Pair<Range<Token>, Long> split : splits)
+ result.add(new CfSplit(split.left.left.toString(), split.left.right.toString(), split.right));
+ return result;
}
public void login(AuthenticationRequest auth_request) throws AuthenticationException, AuthorizationException, TException