You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by li...@apache.org on 2014/03/12 22:17:20 UTC
svn commit: r1576909 [7/18] - in /hbase/branches/0.89-fb/src: ./
examples/thrift/ main/java/org/apache/hadoop/hbase/
main/java/org/apache/hadoop/hbase/avro/
main/java/org/apache/hadoop/hbase/avro/generated/
main/java/org/apache/hadoop/hbase/client/ mai...
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/filter/KeyOnlyFilter.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/filter/KeyOnlyFilter.java?rev=1576909&r1=1576908&r2=1576909&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/filter/KeyOnlyFilter.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/filter/KeyOnlyFilter.java Wed Mar 12 21:17:13 2014
@@ -22,13 +22,12 @@ package org.apache.hadoop.hbase.filter;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
-import org.apache.hadoop.hbase.util.Bytes;
-
import java.util.ArrayList;
import java.util.List;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
+
import com.google.common.base.Preconditions;
/**
@@ -45,7 +44,7 @@ public class KeyOnlyFilter extends Filte
public KeyOnlyFilter(boolean lenAsVal) { this.lenAsVal = lenAsVal; }
public static Filter createFilterFromArguments(ArrayList<byte []> filterArguments) {
- Preconditions.checkArgument(filterArguments.size() == 0,
+ Preconditions.checkArgument(filterArguments.isEmpty(),
"Expected: 0 but got: %s", filterArguments.size());
return new KeyOnlyFilter();
}
@@ -56,10 +55,12 @@ public class KeyOnlyFilter extends Filte
return ReturnCode.INCLUDE;
}
+ @Override
public void write(DataOutput out) throws IOException {
out.writeBoolean(this.lenAsVal);
}
+ @Override
public void readFields(DataInput in) throws IOException {
this.lenAsVal = in.readBoolean();
}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/filter/MultipleColumnPrefixFilter.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/filter/MultipleColumnPrefixFilter.java?rev=1576909&r1=1576908&r2=1576909&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/filter/MultipleColumnPrefixFilter.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/filter/MultipleColumnPrefixFilter.java Wed Mar 12 21:17:13 2014
@@ -20,18 +20,18 @@
package org.apache.hadoop.hbase.filter;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
-import org.apache.hadoop.hbase.util.Bytes;
-
+import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import java.io.DataInput;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.TreeSet;
-import java.util.ArrayList;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
+import org.apache.hadoop.hbase.util.Bytes;
/**
* This filter is used for selecting only those keys with columns that matches
@@ -67,7 +67,7 @@ public class MultipleColumnPrefixFilter
@Override
public ReturnCode filterKeyValue(KeyValue kv, List<KeyValueScanner> scanners) {
- if (sortedPrefixes.size() == 0 || kv.getBuffer() == null) {
+ if (sortedPrefixes.isEmpty() || kv.getBuffer() == null) {
return ReturnCode.INCLUDE;
} else {
return filterColumn(kv.getBuffer(), kv.getQualifierOffset(), kv.getQualifierLength());
@@ -108,6 +108,7 @@ public class MultipleColumnPrefixFilter
return new MultipleColumnPrefixFilter(prefixes);
}
+ @Override
public void write(DataOutput out) throws IOException {
out.writeInt(sortedPrefixes.size());
for (byte [] element : sortedPrefixes) {
@@ -115,6 +116,7 @@ public class MultipleColumnPrefixFilter
}
}
+ @Override
public void readFields(DataInput in) throws IOException {
int x = in.readInt();
this.sortedPrefixes = createTreeSet();
@@ -123,6 +125,7 @@ public class MultipleColumnPrefixFilter
}
}
+ @Override
public KeyValue getNextKeyHint(KeyValue kv) {
return KeyValue.createFirstOnRow(
kv.getBuffer(), kv.getRowOffset(), kv.getRowLength(), kv.getBuffer(),
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/filter/ParseConstants.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/filter/ParseConstants.java?rev=1576909&r1=1576908&r2=1576909&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/filter/ParseConstants.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/filter/ParseConstants.java Wed Mar 12 21:17:13 2014
@@ -244,7 +244,7 @@ public final class ParseConstants {
* RegexStringType byte array
*/
public static final byte [] regexStringType = new byte [] {'r','e','g','e','x',
- 's','t','r','i','n','g'};
+ 's','t','r','i','n','g'};
/**
* SubstringType byte array
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/filter/ParseFilter.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/filter/ParseFilter.java?rev=1576909&r1=1576908&r2=1576909&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/filter/ParseFilter.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/filter/ParseFilter.java Wed Mar 12 21:17:13 2014
@@ -44,7 +44,7 @@ import org.apache.hadoop.hbase.util.Byte
* in a scanner object which is then returned
* <p>
* More documentation on this Filter Language can be found here
- * http://hbase.apache.org/book.html#thrift.filter-language
+ * https://our.intern.facebook.com/intern/wiki/index.php/HBase/Filter_Language
*
*/
public class ParseFilter {
Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/filter/TFilter.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/filter/TFilter.java?rev=1576909&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/filter/TFilter.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/filter/TFilter.java Wed Mar 12 21:17:13 2014
@@ -0,0 +1,196 @@
+/*
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.filter;
+
+import com.facebook.swift.codec.ThriftConstructor;
+import com.facebook.swift.codec.ThriftField;
+import com.facebook.swift.codec.ThriftStruct;
+import com.google.common.base.Objects;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.HbaseObjectWritable;
+import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+@ThriftStruct
+public class TFilter implements Filter {
+ private final Log LOG = LogFactory.getLog(TFilter.class);
+ private byte[] filterBytes;
+ private Filter filter;
+
+ /**
+ * Empty constructor for hadoop rpc
+ */
+ public TFilter() {
+ }
+
+ /**
+ * Constructor used by thrift.
+ * Use {@link TFilter#getTFilter(Filter)} to construct a TFilter object from
+ * a Filter object. We serialize the Filter object in the older method and
+ * deserailize on the server side as well.
+ * @param filterBytes
+ * @throws IOException
+ */
+ @ThriftConstructor
+ public TFilter(@ThriftField(1) byte[] filterBytes) throws IOException {
+ this.filterBytes = filterBytes;
+ this.filter = getFilter();
+ }
+
+ public TFilter(byte[] filterBytes, Filter filter) {
+ this.filterBytes = filterBytes;
+ this.filter = filter;
+ }
+
+ @ThriftField(1)
+ public byte[] getFilterBytes() {
+ return this.filterBytes;
+ }
+
+ /**
+ * Constructs the Filter using the
+ * @return
+ * @throws IOException
+ */
+ public Filter getFilter() throws IOException {
+ if (filter != null) return filter;
+ if (this.filterBytes == null || this.filterBytes.length == 0) return null;
+ DataInput in = new DataInputStream(
+ new ByteArrayInputStream(this.filterBytes));
+ return filter = (Filter)HbaseObjectWritable.readObject(in, null);
+ }
+
+ /**
+ * Constructs the thrift serialized representation of Filter using
+ * HBaseObjectWritable helper methods.
+ * @param filter
+ * @return
+ * @throws IOException
+ */
+ public static TFilter getTFilter(Filter filter) throws IOException {
+ if (filter == null) return null;
+ ByteArrayOutputStream stream = new ByteArrayOutputStream();
+ DataOutputStream out = new DataOutputStream(stream);
+ HbaseObjectWritable.writeObject(out, filter, filter.getClass(), null);
+ out.close();
+ return new TFilter(stream.toByteArray(), filter);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ this.filterBytes = Bytes.readByteArray(in);
+ this.filter = getFilter();
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ Bytes.writeByteArray(out, this.filterBytes);
+ }
+
+ @Override
+ public void reset() {
+ this.filter.reset();
+ }
+
+ @Override
+ public boolean filterRowKey(byte[] buffer, int offset, int length) {
+ return this.filter.filterRowKey(buffer, offset, length);
+ }
+
+ @Override
+ public boolean filterAllRemaining() {
+ return this.filter.filterAllRemaining();
+ }
+
+ @Override
+ public ReturnCode filterKeyValue(KeyValue v) {
+ return this.filter.filterKeyValue(v);
+ }
+
+ @Override
+ public ReturnCode filterKeyValue(KeyValue v, List<KeyValueScanner> scanners) {
+ return this.filter.filterKeyValue(v, scanners);
+ }
+
+ @Override
+ public void filterRow(List<KeyValue> kvs) {
+ this.filter.filterRow(kvs);
+ }
+
+ @Override
+ public boolean hasFilterRow() {
+ return this.filter.hasFilterRow();
+ }
+
+ @Override
+ public boolean filterRow() {
+ return this.filter.filterRow();
+ }
+
+ @Override
+ public KeyValue getNextKeyHint(KeyValue currentKV) {
+ return this.filter.getNextKeyHint(currentKV);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(this.filterBytes);
+ }
+
+ @Override
+ public String toString() {
+ if (this.filter == null) {
+ return null;
+ }
+
+ return this.filter.toString();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ TFilter other = (TFilter) obj;
+ if (!Arrays.equals(filterBytes, other.filterBytes)) {
+ return false;
+ }
+ return true;
+ }
+}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java?rev=1576909&r1=1576908&r2=1576909&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java Wed Mar 12 21:17:13 2014
@@ -18,15 +18,6 @@
package org.apache.hadoop.hbase.io;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.lang.reflect.Array;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configurable;
@@ -41,27 +32,31 @@ import org.apache.hadoop.hbase.HServerAd
import org.apache.hadoop.hbase.HServerInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.client.MultiAction;
-import org.apache.hadoop.hbase.client.MultiResponse;
-import org.apache.hadoop.hbase.client.Row;
-import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.IntegerOrResultOrException;
+import org.apache.hadoop.hbase.client.MultiAction;
import org.apache.hadoop.hbase.client.MultiPut;
import org.apache.hadoop.hbase.client.MultiPutResponse;
+import org.apache.hadoop.hbase.client.MultiResponse;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.ColumnCountGetFilter;
+import org.apache.hadoop.hbase.filter.ColumnPaginationFilter;
import org.apache.hadoop.hbase.filter.ColumnPrefixFilter;
import org.apache.hadoop.hbase.filter.ColumnRangeFilter;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.CompoundRowPrefixFilter;
import org.apache.hadoop.hbase.filter.DependentColumnFilter;
+import org.apache.hadoop.hbase.filter.FamilyFilter;
+import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.filter.InclusiveStopFilter;
import org.apache.hadoop.hbase.filter.KeyOnlyFilter;
+import org.apache.hadoop.hbase.filter.MultipleColumnPrefixFilter;
import org.apache.hadoop.hbase.filter.PageFilter;
import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.filter.QualifierFilter;
@@ -69,6 +64,7 @@ import org.apache.hadoop.hbase.filter.Ro
import org.apache.hadoop.hbase.filter.SingleColumnValueExcludeFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.filter.SkipFilter;
+import org.apache.hadoop.hbase.filter.TimestampsFilter;
import org.apache.hadoop.hbase.filter.ValueFilter;
import org.apache.hadoop.hbase.filter.WhileMatchFilter;
import org.apache.hadoop.hbase.filter.WritableByteArrayComparable;
@@ -87,6 +83,15 @@ import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableFactories;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.lang.reflect.Array;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
/**
* This is a customized version of the polymorphic hadoop
* {@link ObjectWritable}. It removes UTF8 (HADOOP-414).
@@ -207,7 +212,7 @@ public class HbaseObjectWritable impleme
// Online schema change
addToMap(Integer.class, code++);
addToMap(Pair.class, code++);
-
+
// Favored Assignment
addToMap(AssignmentPlan.class, code++);
@@ -220,6 +225,14 @@ public class HbaseObjectWritable impleme
addToMap(MultiResponse.class, code++);
addToMap(HFileHistogram.Bucket.class, code++);
addToMap(CompoundRowPrefixFilter.class, code++);
+
+ addToMap(IntegerOrResultOrException.Type.class, code++);
+ addToMap(FilterList.class, code++);
+ addToMap(ColumnPaginationFilter.class, code++);
+ addToMap(TimestampsFilter.class, code++);
+ addToMap(MultipleColumnPrefixFilter.class, code++);
+ addToMap(FamilyFilter.class, code++);
+
}
private Class<?> declaredClass;
@@ -514,7 +527,9 @@ public class HbaseObjectWritable impleme
Array.set(instance, i, readObject(in, conf));
}
}
- } else if (List.class.isAssignableFrom(declaredClass)) { // List
+ } else if (declaredClass == Exception.class) {
+ instance = new Exception();
+ } else if (List.class.isAssignableFrom(declaredClass)) { // List
int length = in.readInt();
instance = new ArrayList(length);
for (int i = 0; i < length; i++) {
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/ImmutableBytesWritable.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/ImmutableBytesWritable.java?rev=1576909&r1=1576908&r2=1576909&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/ImmutableBytesWritable.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/ImmutableBytesWritable.java Wed Mar 12 21:17:13 2014
@@ -86,7 +86,7 @@ implements WritableComparable<ImmutableB
* Get the data from the BytesWritable.
* @return The data is only valid between offset and offset+length.
*/
- public byte [] get() {
+ public byte[] get() {
if (this.bytes == null) {
throw new IllegalStateException("Uninitialiized. Null constructor " +
"called w/o accompaying readFields invocation");
Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/LimitInputStream.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/LimitInputStream.java?rev=1576909&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/LimitInputStream.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/LimitInputStream.java Wed Mar 12 21:17:13 2014
@@ -0,0 +1,104 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.io;
+
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Copied from guava source code v15 (LimitedInputStream)
+ * Guava deprecated LimitInputStream in v14 and removed it in v15. Copying this class here
+ * allows to be compatible with guava 11 to 15+.
+ */
+public final class LimitInputStream extends FilterInputStream {
+ private long left;
+ private long mark = -1;
+
+ public LimitInputStream(InputStream in, long limit) {
+ super(in);
+ checkNotNull(in);
+ checkArgument(limit >= 0, "limit must be non-negative");
+ left = limit;
+ }
+
+ @Override
+ public int available() throws IOException {
+ return (int) Math.min(in.available(), left);
+ }
+
+ // it's okay to mark even if mark isn't supported, as reset won't work
+ @Override
+ public synchronized void mark(int readLimit) {
+ in.mark(readLimit);
+ mark = left;
+ }
+
+ @Override
+ public int read() throws IOException {
+ if (left == 0) {
+ return -1;
+ }
+
+ int result = in.read();
+ if (result != -1) {
+ --left;
+ }
+ return result;
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ if (left == 0) {
+ return -1;
+ }
+
+ len = (int) Math.min(len, left);
+ int result = in.read(b, off, len);
+ if (result != -1) {
+ left -= result;
+ }
+ return result;
+ }
+
+ @Override
+ public synchronized void reset() throws IOException {
+ if (!in.markSupported()) {
+ throw new IOException("Mark not supported");
+ }
+ if (mark == -1) {
+ throw new IOException("Mark not set");
+ }
+
+ in.reset();
+ left = mark;
+ }
+
+ @Override
+ public long skip(long n) throws IOException {
+ n = Math.min(n, left);
+ long skipped = in.skip(n);
+ left -= skipped;
+ return skipped;
+ }
+}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/TimeRange.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/TimeRange.java?rev=1576909&r1=1576908&r2=1576909&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/TimeRange.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/TimeRange.java Wed Mar 12 21:17:13 2014
@@ -28,6 +28,10 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.hbase.util.Bytes;
+import com.facebook.swift.codec.ThriftConstructor;
+import com.facebook.swift.codec.ThriftField;
+import com.facebook.swift.codec.ThriftStruct;
+
/**
* Represents an interval of version timestamps.
* <p>
@@ -36,17 +40,22 @@ import org.apache.hadoop.hbase.util.Byte
* <p>
* Only used internally; should not be accessed directly by clients.
*/
+@ThriftStruct
public class TimeRange implements Writable {
- private long minStamp = 0L;
- private long maxStamp = Long.MAX_VALUE;
- private boolean allTime = false;
+ @ThriftField(1)
+ public boolean allTime = false;
+
+ @ThriftField(2)
+ public long minStamp = 0L;
+
+ @ThriftField(3)
+ public long maxStamp = Long.MAX_VALUE;
/**
* Default constructor.
* Represents interval [0, Long.MAX_VALUE) (allTime)
*/
public TimeRange() {
- allTime = true;
}
/**
@@ -62,20 +71,32 @@ public class TimeRange implements Writab
* @param minStamp the minimum timestamp value, inclusive
*/
public TimeRange(byte [] minStamp) {
- this.minStamp = Bytes.toLong(minStamp);
+ this.minStamp = Bytes.toLong(minStamp);
+ }
+
+ public TimeRange(final long minStamp, final long maxStamp) throws IOException {
+ this(false, minStamp, maxStamp);
}
+
/**
+ * Thrift Constructor
* Represents interval [minStamp, maxStamp)
* @param minStamp the minimum timestamp, inclusive
* @param maxStamp the maximum timestamp, exclusive
* @throws IOException
*/
- public TimeRange(long minStamp, long maxStamp)
+ @ThriftConstructor
+ public TimeRange(
+ @ThriftField(1) final boolean allTime,
+ @ThriftField(2) final long minStamp,
+ @ThriftField(3) final long maxStamp
+ )
throws IOException {
if(maxStamp < minStamp) {
throw new IOException("maxStamp is smaller than minStamp");
}
+ this.allTime = allTime;
this.minStamp = minStamp;
this.maxStamp = maxStamp;
}
@@ -115,8 +136,8 @@ public class TimeRange implements Writab
* @return true if within TimeRange, false if not
*/
public boolean withinTimeRange(byte [] bytes, int offset) {
- if(allTime) return true;
- return withinTimeRange(Bytes.toLong(bytes, offset));
+ if(allTime) return true;
+ return withinTimeRange(Bytes.toLong(bytes, offset));
}
/**
@@ -128,9 +149,9 @@ public class TimeRange implements Writab
* @return true if within TimeRange, false if not
*/
public boolean withinTimeRange(long timestamp) {
- if(allTime) return true;
- // check if >= minStamp
- return (minStamp <= timestamp && timestamp < maxStamp);
+ if(allTime) return true;
+ // check if >= minStamp
+ return (minStamp <= timestamp && timestamp < maxStamp);
}
/**
@@ -186,4 +207,32 @@ public class TimeRange implements Writab
out.writeLong(maxStamp);
out.writeBoolean(this.allTime);
}
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + (allTime ? 1231 : 1237);
+ result = prime * result + (int) (maxStamp ^ (maxStamp >>> 32));
+ result = prime * result + (int) (minStamp ^ (minStamp >>> 32));
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ TimeRange other = (TimeRange) obj;
+ if (allTime != other.allTime)
+ return false;
+ if (maxStamp != other.maxStamp)
+ return false;
+ if (minStamp != other.minStamp)
+ return false;
+ return true;
+ }
}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/EncodedDataBlock.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/EncodedDataBlock.java?rev=1576909&r1=1576908&r2=1576909&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/EncodedDataBlock.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/EncodedDataBlock.java Wed Mar 12 21:17:13 2014
@@ -33,7 +33,7 @@ import org.apache.hadoop.hbase.util.Byte
import org.apache.hadoop.io.compress.Compressor;
import com.google.common.base.Preconditions;
-import com.google.common.io.NullOutputStream;
+import org.apache.commons.io.output.NullOutputStream;
/**
* Encapsulates a data block compressed using a particular encoding algorithm.
@@ -136,8 +136,8 @@ public class EncodedDataBlock {
*/
public static int getCompressedSize(Algorithm algo, Compressor compressor,
byte[] inputBuffer, int offset, int length) throws IOException {
- DataOutputStream compressedStream = new DataOutputStream(
- new NullOutputStream());
+ DataOutputStream compressedStream =
+ new DataOutputStream(NullOutputStream.NULL_OUTPUT_STREAM);
if (compressor != null) {
compressor.reset();
}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java?rev=1576909&r1=1576908&r2=1576909&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java Wed Mar 12 21:17:13 2014
@@ -333,12 +333,12 @@ public abstract class AbstractHFileReade
public int seekTo(byte[] key) throws IOException {
return seekTo(key, 0, key.length);
}
-
+
@Override
public boolean seekBefore(byte[] key) throws IOException {
return seekBefore(key, 0, key.length);
}
-
+
@Override
public int reseekTo(byte[] key) throws IOException {
return reseekTo(key, 0, key.length);
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileWriter.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileWriter.java?rev=1576909&r1=1576908&r2=1576909&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileWriter.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileWriter.java Wed Mar 12 21:17:13 2014
@@ -90,7 +90,7 @@ public abstract class AbstractHFileWrite
/** The compression algorithm used. NONE if no compression. */
protected final Compression.Algorithm compressAlgo;
-
+
/**
* The data block encoding which will be used.
* {@link NoOpDataBlockEncoder#INSTANCE} if there is no encoding.
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheColumnFamilySummary.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheColumnFamilySummary.java?rev=1576909&r1=1576908&r2=1576909&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheColumnFamilySummary.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheColumnFamilySummary.java Wed Mar 12 21:17:13 2014
@@ -27,7 +27,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Writable;
/**
- * BlockCacheColumnFamilySummary represents a summary of the blockCache usage
+ * BlockCacheColumnFamilySummary represents a summary of the blockCache usage
* at Table/ColumnFamily granularity.
* <br><br>
* As ColumnFamilies are owned by Tables, a summary by ColumnFamily implies that
@@ -45,11 +45,11 @@ public class BlockCacheColumnFamilySumma
* Default constructor for Writable
*/
public BlockCacheColumnFamilySummary() {
-
+
}
-
+
/**
- *
+ *
* @param table table
* @param columnFamily columnFamily
*/
@@ -57,59 +57,59 @@ public class BlockCacheColumnFamilySumma
this.table = table;
this.columnFamily = columnFamily;
}
-
+
/**
- *
+ *
* @return table
*/
public String getTable() {
return table;
}
/**
- *
+ *
* @param table (table that owns the cached block)
*/
public void setTable(String table) {
this.table = table;
}
/**
- *
+ *
* @return columnFamily
*/
public String getColumnFamily() {
return columnFamily;
}
/**
- *
+ *
* @param columnFamily (columnFamily that owns the cached block)
*/
public void setColumnFamily(String columnFamily) {
this.columnFamily = columnFamily;
}
-
+
/**
- *
+ *
* @return blocks in the cache
*/
public int getBlocks() {
return blocks;
}
/**
- *
+ *
* @param blocks in the cache
*/
public void setBlocks(int blocks) {
this.blocks = blocks;
}
-
+
/**
- *
+ *
* @return heapSize in the cache
*/
public long getHeapSize() {
return heapSize;
}
-
+
/**
* Increments the number of blocks in the cache for this entry
*/
@@ -118,7 +118,7 @@ public class BlockCacheColumnFamilySumma
}
/**
- *
+ *
* @param heapSize to increment
*/
public void incrementHeapSize(long heapSize) {
@@ -126,13 +126,13 @@ public class BlockCacheColumnFamilySumma
}
/**
- *
+ *
* @param heapSize (total heapSize for the table/CF)
*/
public void setHeapSize(long heapSize) {
this.heapSize = heapSize;
}
-
+
@Override
public void readFields(DataInput in) throws IOException {
table = in.readUTF();
@@ -140,7 +140,7 @@ public class BlockCacheColumnFamilySumma
blocks = in.readInt();
heapSize = in.readLong();
}
-
+
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(table);
@@ -148,7 +148,7 @@ public class BlockCacheColumnFamilySumma
out.writeInt(blocks);
out.writeLong(heapSize);
}
-
+
@Override
public int hashCode() {
final int prime = 31;
@@ -179,15 +179,15 @@ public class BlockCacheColumnFamilySumma
return false;
return true;
}
-
-
-
+
+
+
@Override
public String toString() {
return "BlockCacheSummaryEntry [table=" + table + ", columnFamily="
+ columnFamily + ", blocks=" + blocks + ", heapSize=" + heapSize + "]";
}
-
+
/**
* Construct a BlockCacheSummaryEntry from a full StoreFile Path
* <br><br>
@@ -200,12 +200,12 @@ public class BlockCacheColumnFamilySumma
* '70236052' = Region <br>
* 'info' = ColumnFamily <br>
* '3944417774205889744' = StoreFile
- *
+ *
* @param path (full StoreFile Path)
* @return BlockCacheSummaryEntry
*/
public static BlockCacheColumnFamilySummary createFromStoreFilePath(Path path) {
-
+
// The full path will look something like this...
// hdfs://localhost:51169/user/doug.meil/-ROOT-/70236052/info/3944417774205889744
// tbl region cf sf
@@ -218,7 +218,7 @@ public class BlockCacheColumnFamilySumma
String table = s[s.length - 4]; // 4th from the end
String cf = s[s.length - 2]; // 2nd from the end
bcse = new BlockCacheColumnFamilySummary(table, cf);
- }
+ }
return bcse;
}
@@ -227,13 +227,13 @@ public class BlockCacheColumnFamilySumma
int i = table.compareTo(o.getTable());
if (i != 0) {
return i;
- }
+ }
return columnFamily.compareTo(o.getColumnFamily());
}
/**
* Creates a new BlockCacheSummaryEntry
- *
+ *
* @param e BlockCacheSummaryEntry
* @return new BlockCacheSummaryEntry
*/
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/Cacheable.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/Cacheable.java?rev=1576909&r1=1576908&r2=1576909&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/Cacheable.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/Cacheable.java Wed Mar 12 21:17:13 2014
@@ -28,7 +28,7 @@ public interface Cacheable extends HeapS
* @return the block type of this cached HFile block
*/
public BlockType getBlockType();
-
+
/**
* @return the metrics object identified by table and column family
*/
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/FixedFileTrailer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/FixedFileTrailer.java?rev=1576909&r1=1576908&r2=1576909&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/FixedFileTrailer.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/FixedFileTrailer.java Wed Mar 12 21:17:13 2014
@@ -27,6 +27,7 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
+import org.apache.commons.io.output.NullOutputStream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FSDataInputStream;
@@ -36,7 +37,6 @@ import org.apache.hadoop.io.RawComparato
import static org.apache.hadoop.hbase.io.hfile.HFile.MIN_FORMAT_VERSION;
import static org.apache.hadoop.hbase.io.hfile.HFile.MAX_FORMAT_VERSION;
-import com.google.common.io.NullOutputStream;
/**
* The {@link HFile} has a fixed trailer which contains offsets to other
@@ -120,7 +120,8 @@ public class FixedFileTrailer {
version <= MAX_FORMAT_VERSION;
++version) {
FixedFileTrailer fft = new FixedFileTrailer(version);
- DataOutputStream dos = new DataOutputStream(new NullOutputStream());
+ DataOutputStream dos =
+ new DataOutputStream(NullOutputStream.NULL_OUTPUT_STREAM);
try {
fft.serialize(dos);
} catch (IOException ex) {
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java?rev=1576909&r1=1576908&r2=1576909&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java Wed Mar 12 21:17:13 2014
@@ -610,7 +610,7 @@ public class HFileBlockIndex {
* what was written into the root level by
* {@link BlockIndexWriter#writeIndexBlocks(FSDataOutputStream)} at the
* offset that function returned.
- *
+ *
* @param in the buffered input stream or wrapped byte input stream
* @param numEntries the number of root-level index entries
* @throws IOException
@@ -636,7 +636,7 @@ public class HFileBlockIndex {
* Read the root-level metadata of a multi-level block index. Based on
* {@link #readRootIndex(DataInput, int)}, but also reads metadata
* necessary to compute the mid-key in a multi-level index.
- *
+ *
* @param in the buffered or byte input stream to read from
* @param numEntries the number of root-level index entries
* @throws IOException
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/InlineBlockWriter.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/InlineBlockWriter.java?rev=1576909&r1=1576908&r2=1576909&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/InlineBlockWriter.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/InlineBlockWriter.java Wed Mar 12 21:17:13 2014
@@ -31,7 +31,7 @@ public interface InlineBlockWriter {
/**
* Determines whether there is a new block to be written out.
- *
+ *
* @param closing
* whether the file is being closed, in which case we need to write
* out all available data and not wait to accumulate another block
@@ -41,7 +41,7 @@ public interface InlineBlockWriter {
/**
* Writes the block to the provided stream. Must not write any magic records.
* Called only if {@link #shouldWriteBlock(boolean)} returned true.
- *
+ *
* @param out
* a stream (usually a compressing stream) to write the block to
*/
@@ -52,7 +52,7 @@ public interface InlineBlockWriter {
* compressed size have been determined. Can be used to add an entry to a
* block index. If this type of inline blocks needs a block index, the inline
* block writer is responsible for maintaining it.
- *
+ *
* @param offset the offset of the block in the stream
* @param onDiskSize the on-disk size of the block
* @param uncompressedSize the uncompressed size of the block
@@ -64,7 +64,7 @@ public interface InlineBlockWriter {
* The type of blocks this block writer produces.
*/
BlockType getInlineBlockType();
-
+
/**
* @return true if inline blocks produced by this writer should be cached
*/
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java?rev=1576909&r1=1576908&r2=1576909&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java Wed Mar 12 21:17:13 2014
@@ -325,7 +325,7 @@ public class LruBlockCache implements Bl
* Helper function that updates the local size counter and also updates any
* per-cf or per-blocktype metrics it can discern from given
* {@link CachedBlock}
- *
+ *
* @param cb
* @param evict
*/
@@ -371,7 +371,7 @@ public class LruBlockCache implements Bl
evictBlock(cb);
return true;
}
-
+
/**
* Adds specific HFile to a list of recently closed files. Next time the eviction happens
* all blocks of the File will be evicted.
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/histogram/HFileHistogram.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/histogram/HFileHistogram.java?rev=1576909&r1=1576908&r2=1576909&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/histogram/HFileHistogram.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/histogram/HFileHistogram.java Wed Mar 12 21:17:13 2014
@@ -34,6 +34,10 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Writable;
+import com.facebook.swift.codec.ThriftConstructor;
+import com.facebook.swift.codec.ThriftField;
+import com.facebook.swift.codec.ThriftStruct;
+
/**
* Captures histogram of statistics about the distribution of rows in the HFile.
* This needs to be serialized onto the HFile.
@@ -43,6 +47,7 @@ public interface HFileHistogram {
* This enum provides the set of additional stats the Histogram can store.
* (TODO) manukranthk : Integrate HFileStats.
*/
+ @ThriftStruct
public static enum HFileStat {
KEYVALUECOUNT
}
@@ -50,14 +55,19 @@ public interface HFileHistogram {
public final String HFILEHISTOGRAM_BINCOUNT = "hfile.histogram.bin.count";
public final int DEFAULT_HFILEHISTOGRAM_BINCOUNT = 100;
+ @ThriftStruct
public static class Bucket implements Writable {
private byte[] startRow;
private byte[] endRow;
private double numRows;
private Map<HFileStat, Double> hfileStats;
- private Bucket(byte[] startRow, byte[] endRow, double numRows,
- Map<HFileStat, Double> hfileStats) {
+ @ThriftConstructor
+ public Bucket(
+ @ThriftField(1) byte[] startRow,
+ @ThriftField(2) byte[] endRow,
+ @ThriftField(3) double numRows,
+ @ThriftField(4) Map<HFileStat, Double> hfileStats) {
this.startRow = startRow;
this.endRow = endRow;
this.numRows = numRows;
@@ -67,33 +77,50 @@ public interface HFileHistogram {
public Bucket() {
}
- public double getCount() {
- return numRows;
+ /**
+ * @return returns a copy of last.
+ */
+ @ThriftField(1)
+ public byte[] getStartRow() {
+ return Bytes.copyOfByteArray(this.startRow);
}
/**
- * Returns the number of key values that this bucket holds.
- *
+ * @return returns a copy of the endRow.
+ */
+ @ThriftField(2)
+ public byte[] getEndRow() {
+ return Bytes.copyOfByteArray(this.endRow);
+ }
+
+ @ThriftField(3)
+ /**
+ * Returns the number of rows in this bucket.
* @return
*/
- public double getNumKvs() {
- return this.hfileStats.get(HFileStat.KEYVALUECOUNT);
+ public double getCount() {
+ return numRows;
}
/**
- * @return returns a copy of the endRow
+ * Returns the underlying map of HFileStats.
+ * @return
*/
- public byte[] getEndRow() {
- return Bytes.copyOfByteArray(this.endRow);
+ @ThriftField(4)
+ public Map<HFileStat, Double> getStatsMap() {
+ return this.hfileStats;
}
/**
- * @return returns a copy of last
+ * Returns the number of key values that this bucket holds.
+ *
+ * @return
*/
- public byte[] getStartRow() {
- return Bytes.copyOfByteArray(this.startRow);
+ public double getNumKvs() {
+ return this.hfileStats.get(HFileStat.KEYVALUECOUNT);
}
+
@Override
public void readFields(DataInput in) throws IOException {
this.startRow = Bytes.readByteArray(in);
@@ -238,16 +265,6 @@ public interface HFileHistogram {
public Writable serialize();
/**
- * Composes a list of HFileHistograms and returns a HFileHistogram which is a
- * merge of all the given Histograms. Assumes that the HFileHistogram objects
- * in the list are of the same type as this object.
- *
- * @param histograms
- * @return
- */
- public HFileHistogram compose(List<HFileHistogram> histograms);
-
- /**
* Method to deserialize the histogram from the HFile. This is the inverse of
* the serialize function.
*
@@ -260,4 +277,13 @@ public interface HFileHistogram {
public HFileHistogram merge(HFileHistogram h2);
public int getBinCount();
+
+ /**
+ * Creates a new instance of the same object as self.
+ *
+ * @param binCount
+ * the number of binCount.
+ * @return a new instance of HFileHistogram object.
+ */
+ public HFileHistogram create(int binCount);
}
Copied: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/histogram/HistogramUtils.java (from r1576907, hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/YouAreDeadException.java)
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/histogram/HistogramUtils.java?p2=hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/histogram/HistogramUtils.java&p1=hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/YouAreDeadException.java&r1=1576907&r2=1576909&rev=1576909&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/YouAreDeadException.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/histogram/HistogramUtils.java Wed Mar 12 21:17:13 2014
@@ -1,5 +1,5 @@
-/**
- * Copyright 2010 The Apache Software Foundation
+/*
+ * Copyright 2014 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -17,18 +17,39 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.hbase;
+package org.apache.hadoop.hbase.io.hfile.histogram;
import java.io.IOException;
+import java.util.Collection;
+
+import org.apache.hadoop.hbase.regionserver.Store;
/**
- * This exception is thrown by the master when a region server reports and is
- * already being processed as dead. This can happen when a region server loses
- * its session but didn't figure it yet.
+ * Utilities for histogram computing.
*/
-public class YouAreDeadException extends IOException {
+public class HistogramUtils {
+ /**
+ * Merges histograms from a collection of Stores.
+ *
+ * @return null if all stores contains no HFileHistogram. A merged
+ * HFileHistogram otherwise.
+ */
+ public static HFileHistogram mergeOfStores(Collection<Store> stores)
+ throws IOException {
+ HFileHistogram h = null;
+
+ for (Store s : stores) {
+ HFileHistogram hist = s.getHistogram();
- public YouAreDeadException(String message) {
- super(message);
+ if (hist != null) {
+ if (h == null) {
+ h = hist.create(hist.getBinCount());
+ }
+
+ h.merge(hist);
+ }
+ }
+ return h;
}
}
+
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/histogram/HiveBasedNumericHistogram.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/histogram/HiveBasedNumericHistogram.java?rev=1576909&r1=1576908&r2=1576909&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/histogram/HiveBasedNumericHistogram.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/histogram/HiveBasedNumericHistogram.java Wed Mar 12 21:17:13 2014
@@ -50,6 +50,7 @@ public class HiveBasedNumericHistogram i
double y;
Map<Enum<?>, Double> stats;
+ @Override
public int compareTo(Coord o) {
if (x < o.x) {
return -1;
@@ -132,6 +133,7 @@ public class HiveBasedNumericHistogram i
* @param num_bins
* Number of non-uniform-width histogram bins to use
*/
+ @Override
public void allocate(int numBins) {
nbins = numBins;
bins = new ArrayList<Coord>();
@@ -146,6 +148,7 @@ public class HiveBasedNumericHistogram i
* A serialized histogram created by the serialize() method
* @see #merge
*/
+ @Override
public NumericHistogram merge(NumericHistogram hist) {
Preconditions.checkNotNull(hist);
Preconditions.checkArgument(hist instanceof HiveBasedNumericHistogram);
@@ -205,6 +208,7 @@ public class HiveBasedNumericHistogram i
* @param v
* The data point to add to the histogram approximation.
*/
+ @Override
public void add(double v) {
// Binary search to find the closest bucket that v should go into.
// 'bin' should be interpreted as the bin to shift right in order to
@@ -411,7 +415,7 @@ public class HiveBasedNumericHistogram i
List<Bucket> buckets = Lists.newArrayList();
Preconditions.checkArgument(hist != null);
Preconditions.checkArgument(hist.bins != null);
- if (hist.bins.size() == 0) {
+ if (hist.bins.isEmpty()) {
return buckets;
}
buckets.add(new Bucket(this.minusInfinity, hist.bins.get(0).x, hist.bins
@@ -451,6 +455,7 @@ public class HiveBasedNumericHistogram i
/**
* Constructs a uniform histogram and returns the list of buckets.
*/
+ @Override
public List<Bucket> getUniformBuckets() {
return getBuckets(uniform(this.getUsedBins()));
}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/histogram/UniformSplitHFileHistogram.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/histogram/UniformSplitHFileHistogram.java?rev=1576909&r1=1576908&r2=1576909&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/histogram/UniformSplitHFileHistogram.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/histogram/UniformSplitHFileHistogram.java Wed Mar 12 21:17:13 2014
@@ -29,6 +29,7 @@ import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
+
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Writable;
@@ -202,22 +203,6 @@ public class UniformSplitHFileHistogram
}.setVal(getUniformBuckets());
}
- /**
- * Modifies the elements in the list of histograms.
- */
- @Override
- public HFileHistogram compose(List<HFileHistogram> histograms) {
- if (histograms.size() <= 0)
- return null;
- HFileHistogram h = histograms.get(0);
- int binCnt = h.getBinCount();
- HFileHistogram ret = new UniformSplitHFileHistogram(binCnt);
- for (HFileHistogram h2 : histograms) {
- ret = ret.merge(h2);
- }
- return ret;
- }
-
@Override
public HFileHistogram deserialize(ByteBuffer buf) throws IOException {
if (buf == null) return null;
@@ -232,7 +217,9 @@ public class UniformSplitHFileHistogram
buckets.add(b);
}
bais.close();
- if (buckets.size() == 0) return null;
+ if (buckets.isEmpty()) {
+ return null;
+ }
HFileHistogram ret = getHistogram(buckets);
return ret;
}
@@ -250,4 +237,9 @@ public class UniformSplitHFileHistogram
public int getBinCount() {
return this.underlyingHistogram.getBinCount();
}
+
+ @Override
+ public HFileHistogram create(int binCount) {
+ return new UniformSplitHFileHistogram(binCount);
+ }
}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java?rev=1576909&r1=1576908&r2=1576909&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java Wed Mar 12 21:17:13 2014
@@ -97,6 +97,7 @@ public class HBaseClient {
final protected long failureSleep; // Time to sleep before retry on failure.
protected final boolean tcpNoDelay; // if T then disable Nagle's Algorithm
protected final boolean tcpKeepAlive; // if T then use keepalives
+ protected final int ipTosValue; // specify a datagram's type-of-service priority
protected int pingInterval; // how often sends ping to the server in msecs
private final int connectionTimeOutMillSec; // the connection time out
@@ -194,9 +195,9 @@ public class HBaseClient {
}
public void setVersion(int version) {
- this.version = version;
+ this.version = version;
}
-
+
public int getVersion() {
return version;
}
@@ -336,7 +337,7 @@ public class HBaseClient {
this.socket.setTcpNoDelay(tcpNoDelay);
this.socket.setKeepAlive(tcpKeepAlive);
NetUtils.connect(this.socket, remoteId.getAddress(),
- connectionTimeOutMillSec);
+ connectionTimeOutMillSec, ipTosValue);
if (remoteId.rpcTimeout > 0) {
pingInterval = remoteId.rpcTimeout; // overwrite pingInterval
}
@@ -605,7 +606,7 @@ public class HBaseClient {
Decompressor decompressor = null;
try {
DataInputStream localIn = in;
-
+
// 1. Read the call id uncompressed which is an int
int id = localIn.readInt();
if (LOG.isDebugEnabled())
@@ -620,9 +621,9 @@ public class HBaseClient {
String compressionAlgoName = localIn.readUTF();
rpcCompression =
Compression.getCompressionAlgorithmByName(compressionAlgoName);
-
+
// 4. setup the correct decompressor (if any)
- if (rpcCompression != Compression.Algorithm.NONE) {
+ if (rpcCompression != Compression.Algorithm.NONE) {
decompressor = rpcCompression.getDecompressor();
InputStream is = rpcCompression.createDecompressionStream(
in, decompressor, 0);
@@ -792,6 +793,7 @@ public class HBaseClient {
HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
this.tcpNoDelay = conf.getBoolean("hbase.ipc.client.tcpnodelay", false);
this.tcpKeepAlive = conf.getBoolean("hbase.ipc.client.tcpkeepalive", true);
+ this.ipTosValue = conf.getInt("hbase.ipc.client.tos.value", NetUtils.NOT_SET_IP_TOS);
this.pingInterval = getPingInterval(conf);
if (LOG.isDebugEnabled()) {
LOG.debug("The ping interval is" + this.pingInterval + "ms.");
@@ -1011,7 +1013,7 @@ public class HBaseClient {
* connectionsId object and with set() method. We need to manage the
* refs for keys in HashMap properly. For now its ok.
*/
- ConnectionId remoteId = new ConnectionId(addr, ticket, rpcTimeout,
+ ConnectionId remoteId = new ConnectionId(addr, ticket, rpcTimeout,
call.getVersion(), connectionNum);
do {
connection = connections.get(remoteId);
@@ -1081,7 +1083,7 @@ public class HBaseClient {
/**
* This class holds the address and the user ticket. The client connections
- * to servers are uniquely identified by
+ * to servers are uniquely identified by
* <remoteAddress, ticket, RPC version>
*/
private static class ConnectionId {
@@ -1121,7 +1123,7 @@ public class HBaseClient {
@Override
public int hashCode() {
- return address.hashCode() ^ System.identityHashCode(ticket) ^
+ return address.hashCode() ^ System.identityHashCode(ticket) ^
rpcTimeout ^ version ^ connectionNum;
}
}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java?rev=1576909&r1=1576908&r2=1576909&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java Wed Mar 12 21:17:13 2014
@@ -20,41 +20,39 @@
package org.apache.hadoop.hbase.ipc;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.net.SocketFactory;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.client.Operation;
import org.apache.hadoop.hbase.io.HbaseObjectWritable;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
+import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.regionserver.HRegion;
-import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.util.ParamFormatHelper;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.ipc.VersionedProtocol;
import org.apache.hadoop.security.UserGroupInformation;
-
import org.codehaus.jackson.map.ObjectMapper;
-import javax.net.SocketFactory;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-
-import java.lang.reflect.Proxy;
-import java.net.InetSocketAddress;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
-
/** A simple RPC mechanism.
*
* This is a local hbase copy of the hadoop RPC so we can do things like
@@ -381,7 +379,7 @@ public class HBaseRPC {
(VersionedProtocol) Proxy.newProxyInstance(
protocol.getClassLoader(), new Class[]{protocol},
new Invoker(addr, ticket, conf, factory, rpcTimeout, options));
-
+
Long serverVersion = versions.get (addr);
if (serverVersion == null) {
serverVersion = proxy.getProtocolVersion(protocol.getName(), clientVersion);
@@ -504,7 +502,7 @@ public class HBaseRPC {
this(instance, conf, bindAddress, port, 1, false);
}
- private static String classNameBase(String className) {
+ public static String classNameBase(String className) {
String[] names = className.split("\\.", -1);
if (names == null || names.length == 0) {
return className;
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPCOptions.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPCOptions.java?rev=1576909&r1=1576908&r2=1576909&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPCOptions.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPCOptions.java Wed Mar 12 21:17:13 2014
@@ -13,28 +13,28 @@ public class HBaseRPCOptions implements
public static final HBaseRPCOptions DEFAULT = new HBaseRPCOptions ();
private static final byte VERSION_INITIAL = 1;
-
+
private byte version = VERSION_INITIAL;
private Compression.Algorithm rxCompression = Compression.Algorithm.NONE;
private Compression.Algorithm txCompression = Compression.Algorithm.NONE;
- private boolean requestProfiling = false;
- private String tag = null;
-
- // this will be used as profiling data in htable so it's possible to
- // set it after receiving profiling data. do not need to serialize this.
- public ProfilingData profilingResult = null;
-
- public HBaseRPCOptions () {}
-
- public void setVersion (byte version) {
- this.version = version;
- }
-
- public byte getVersion () {
+ private boolean requestProfiling = false;
+ private String tag = null;
+
+ // this will be used as profiling data in htable so it's possible to
+ // set it after receiving profiling data. do not need to serialize this.
+ public ProfilingData profilingResult = null;
+
+ public HBaseRPCOptions () {}
+
+ public void setVersion (byte version) {
+ this.version = version;
+ }
+
+ public byte getVersion () {
return this.version;
}
-
- public void setRxCompression(Compression.Algorithm compressionAlgo) {
+
+ public void setRxCompression(Compression.Algorithm compressionAlgo) {
this.rxCompression = compressionAlgo;
}
@@ -49,13 +49,13 @@ public class HBaseRPCOptions implements
public Compression.Algorithm getTxCompression() {
return this.txCompression;
}
-
+
/**
* set whether to request profiling data form the server
*
* @param request request profiling or not
*/
- public void setRequestProfiling (boolean request) {
+ public void setRequestProfiling (boolean request) {
this.requestProfiling = request;
}
@@ -77,26 +77,26 @@ public class HBaseRPCOptions implements
public String getTag () {
return this.tag;
}
-
- @Override
+
+ @Override
public void write(DataOutput out) throws IOException {
- // 1. write the object version
- out.writeByte(this.version);
-
- // 2. write the compression algo used to compress the request being sent
+ // 1. write the object version
+ out.writeByte(this.version);
+
+ // 2. write the compression algo used to compress the request being sent
out.writeUTF(this.txCompression.getName());
// 3. write the compression algo to use for the response
out.writeUTF(this.rxCompression.getName());
// 4. write profiling request flag
- out.writeBoolean(this.requestProfiling);
-
- // 5. write tag flag and tag if flag is true
- out.writeBoolean(this.tag != null ? true : false);
- if (this.tag != null) {
- out.writeUTF(this.tag);
- }
+ out.writeBoolean(this.requestProfiling);
+
+ // 5. write tag flag and tag if flag is true
+ out.writeBoolean(this.tag != null ? true : false);
+ if (this.tag != null) {
+ out.writeUTF(this.tag);
+ }
}
@Override
@@ -117,4 +117,13 @@ public class HBaseRPCOptions implements
this.tag = in.readUTF ();
}
}
+
+ @Override
+ public String toString() {
+ return "HBaseRPCOptions [version=" + version + ", rxCompression="
+ + rxCompression + ", txCompression=" + txCompression
+ + ", requestProfiling=" + requestProfiling + ", tag=" + tag
+ + ", profilingResult=" + profilingResult + "]";
+ }
+
}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java?rev=1576909&r1=1576908&r2=1576909&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java Wed Mar 12 21:17:13 2014
@@ -43,7 +43,13 @@ import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.WritableByteChannel;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
@@ -58,6 +64,10 @@ import org.apache.hadoop.hbase.monitorin
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.HasThread;
+import org.apache.hadoop.hbase.util.throttles.SizeBasedMultiThrottler;
+import org.apache.hadoop.hbase.util.throttles.SizeBasedThrottler;
+import org.apache.hadoop.hbase.util.throttles.SizeBasedThrottlerInterface;
import org.apache.hadoop.io.ObjectWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
@@ -66,10 +76,10 @@ import org.apache.hadoop.io.compress.Dec
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.hbase.util.HasThread;
-import org.apache.hadoop.hbase.util.throttles.SizeBasedMultiThrottler;
-import org.apache.hadoop.hbase.util.throttles.SizeBasedThrottler;
-import org.apache.hadoop.hbase.util.throttles.SizeBasedThrottlerInterface;
+
+import com.facebook.swift.codec.ThriftConstructor;
+import com.facebook.swift.codec.ThriftField;
+import com.facebook.swift.codec.ThriftStruct;
/** An abstract IPC service. IPC calls take a single {@link Writable} as a
* parameter, and return a {@link Writable} as their value. A service runs on
@@ -226,6 +236,7 @@ public abstract class HBaseServer {
}
/** A call queued for handling. */
+ @ThriftStruct
public static class Call {
protected int id; // the client's call id
protected Writable param; // the parameter passed
@@ -251,6 +262,54 @@ public abstract class HBaseServer {
this.timestamp = timestamp;
}
+ /**
+ * Thrift constructor
+ * TODO: serialize more fields, this is just for trial
+ * @param id
+ * @param partialResponseSize
+ * @param shouldProfile
+ */
+ @ThriftConstructor
+ public Call(@ThriftField(1) int id,
+ @ThriftField(2) long partialResponseSize,
+ @ThriftField(3) boolean shouldProfile,
+ @ThriftField(4) ByteBuffer response,
+ @ThriftField(5) long timestamp,
+ @ThriftField(6) ProfilingData profilingData){
+ this.id = id;
+ this.partialResponseSize = partialResponseSize;
+ this.shouldProfile = shouldProfile;
+ this.response = response;
+ this.timestamp = timestamp;
+ this.profilingData = profilingData;
+ }
+
+ /**
+ * This Constructor should be used when the Client sends header to the
+ * Server and in pratice we just need to know whether the call need to be
+ * profiled or not
+ *
+ * @param options
+ */
+ public Call(HBaseRPCOptions options) {
+ this.shouldProfile = options.getRequestProfiling();
+ }
+
+ @ThriftField(1)
+ public int getId() {
+ return id;
+ }
+
+ @ThriftField(5)
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ @ThriftField(4)
+ public ByteBuffer getResponse() {
+ return response;
+ }
+
public void setTag(String tag) {
this.tag = tag;
}
@@ -275,6 +334,7 @@ public abstract class HBaseServer {
return this.compressionAlgo;
}
+ @ThriftField(2)
public long getPartialResponseSize() {
return partialResponseSize;
}
@@ -287,18 +347,37 @@ public abstract class HBaseServer {
return this.connection;
}
- @Override
- public String toString() {
- return param.toString() + " from " + connection.toString();
- }
-
public void setResponse(ByteBuffer response) {
this.response = response;
}
+ @ThriftField(6)
public ProfilingData getProfilingData(){
return this.profilingData;
}
+
+ @ThriftField(3)
+ public boolean isShouldProfile() {
+ return shouldProfile;
+ }
+
+ public void setShouldProfile(boolean shouldProfile) {
+ this.shouldProfile = shouldProfile;
+ }
+
+ public void setProfilingData(ProfilingData profilingData) {
+ this.profilingData = profilingData;
+ }
+
+ @Override
+ public String toString() {
+ return "Call [id=" + id + ", param=" + param + ", connection="
+ + connection + ", timestamp=" + timestamp + ", response=" + response
+ + ", compressionAlgo=" + compressionAlgo + ", version=" + version
+ + ", shouldProfile=" + shouldProfile + ", profilingData="
+ + profilingData + ", tag=" + tag + ", partialResponseSize="
+ + partialResponseSize + "]";
+ }
}
/** Listens on the socket, accepts new connections and handles them to readers*/
Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HConnectionParams.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HConnectionParams.java?rev=1576909&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HConnectionParams.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HConnectionParams.java Wed Mar 12 21:17:13 2014
@@ -0,0 +1,84 @@
+/**
+ * Copyright 2013 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.ipc;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+
+/**
+ * This class preserves parameters of connections to region servers.
+ */
+public class HConnectionParams {
+ private static HConnectionParams instance = null;
+
+ private int numRetries;
+ private final long pause;
+ private final int rpcTimeout;
+ private final long rpcRetryTimeout;
+
+ // The number of times we will retry after receiving a RegionOverloadedException from the
+ // region server. Defaults to 0 (i.e. we will throw the exception and let the client handle retries)
+ // may not always be what you want. But, for the purposes of the HBaseThrift client, that this is
+ // created for, we do not want the thrift layer to hold up IPC threads handling retries.
+ private int maxServerRequestedRetries;
+
+ public static HConnectionParams getInstance(Configuration conf) {
+ if (instance == null) {
+ instance = new HConnectionParams(conf);
+ }
+
+ return instance;
+ }
+
+ private HConnectionParams(Configuration conf) {
+ this.numRetries = conf.getInt(HConstants.CLIENT_RETRY_NUM_STRING, HConstants.DEFAULT_CLIENT_RETRY_NUM);
+ this.maxServerRequestedRetries = conf.getInt(
+ HConstants.SERVER_REQUESTED_RETRIES_STRING, HConstants.DEFAULT_SERVER_REQUESTED_RETRIES);
+
+
+ this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE, HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
+ this.rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
+ this.rpcRetryTimeout = conf.getLong(
+ HConstants.CLIENT_RPC_RETRY_TIMEOUT_STRING, HConstants.DEFAULT_CLIENT_RPC_RETRY_TIMEOUT);
+ }
+
+ public int getNumRetries() {
+ return this.numRetries;
+ }
+
+ public int getMaxServerRequestedRetries() {
+ return this.maxServerRequestedRetries;
+ }
+
+ public long getPauseTime(int tries) {
+ if (tries >= HConstants.RETRY_BACKOFF.length)
+ tries = HConstants.RETRY_BACKOFF.length - 1;
+ return this.pause * HConstants.RETRY_BACKOFF[tries];
+ }
+
+ public int getRpcTimeout() {
+ return this.rpcTimeout;
+ }
+
+ public long getRpcRetryTimeout() {
+ return this.rpcRetryTimeout;
+ }
+}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java?rev=1576909&r1=1576908&r2=1576909&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java Wed Mar 12 21:17:13 2014
@@ -40,7 +40,7 @@ import org.apache.hadoop.io.Writable;
* number in HBaseRPCProtocolVersion
*
*/
-public interface HMasterInterface extends HBaseRPCProtocolVersion {
+public interface HMasterInterface extends HBaseRPCProtocolVersion, ThriftClientInterface {
/** @return true if master is available */
public boolean isMasterRunning();
@@ -78,7 +78,7 @@ public interface HMasterInterface extend
/**
* Batch adds, modifies, and deletes columns from the specified table.
- * Any of the lists may be null, in which case those types of alterations
+ * Any of the lists may be null, in which case those types of alterations
* will not occur.
*
* @param tableName table to modify
@@ -186,7 +186,7 @@ public interface HMasterInterface extend
/**
* Used by the client to get the number of regions that have received the
* updated schema
- *
+ *
* @param tableName
* @return Pair getFirst() is the number of regions pending an update
* getSecond() total number of regions of the table
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HMasterRegionInterface.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HMasterRegionInterface.java?rev=1576909&r1=1576908&r2=1576909&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HMasterRegionInterface.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HMasterRegionInterface.java Wed Mar 12 21:17:13 2014
@@ -35,7 +35,7 @@ import java.io.IOException;
* number in HBaseRPCProtocolVersion
*
*/
-public interface HMasterRegionInterface extends HBaseRPCProtocolVersion {
+public interface HMasterRegionInterface extends HBaseRPCProtocolVersion, ThriftClientInterface {
/**
* Called when a region server first starts
@@ -75,4 +75,4 @@ public interface HMasterRegionInterface
*/
public long getLastFlushedSequenceId(byte[] regionName)
throws IOException;
-}
\ No newline at end of file
+}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java?rev=1576909&r1=1576908&r2=1576909&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java Wed Mar 12 21:17:13 2014
@@ -48,7 +48,8 @@ import org.apache.hadoop.io.MapWritable;
* <p>NOTE: if you change the interface, you must change the RPC version
* number in HBaseRPCProtocolVersion
*/
-public interface HRegionInterface extends HBaseRPCProtocolVersion, Restartable, Stoppable {
+public interface HRegionInterface extends HBaseRPCProtocolVersion, Restartable,
+ Stoppable, ThriftClientInterface {
/**
* Get metainfo about an HRegion
*
@@ -59,7 +60,6 @@ public interface HRegionInterface extend
public HRegionInfo getRegionInfo(final byte [] regionName)
throws NotServingRegionException;
-
/**
* Return all the data for the row that matches <i>row</i> exactly,
* or the one that immediately preceeds it.
@@ -78,7 +78,7 @@ public interface HRegionInterface extend
*
* @return the regions served by this regionserver
*/
- public HRegion [] getOnlineRegionsAsArray();
+ public HRegion[] getOnlineRegionsAsArray();
/**
* Flush the given region
@@ -131,8 +131,8 @@ public interface HRegionInterface extend
* @param CF names
* @return the list of store files
*/
- public List<String> getStoreFileList(byte[] regionName, byte[][] columnFamilies)
- throws IllegalArgumentException;
+ public List<String> getStoreFileList(byte[] regionName,
+ byte[][] columnFamilies) throws IllegalArgumentException;
/**
* Get a list of store files for all CFs in a particular region
@@ -147,16 +147,18 @@ public interface HRegionInterface extend
* included in the list returned
* @return list of HLog files
*/
- public List<String> getHLogsList(boolean rollCurrentHLog) throws IOException;
+ public List<String> getHLogsList(boolean rollCurrentHLog)
+ throws IOException;
/**
+ * TODO: deprecate this
* Perform Get operation.
* @param regionName name of region to get from
* @param get Get operation
* @return Result
* @throws IOException e
*/
- public Result get(byte [] regionName, Get get) throws IOException;
+ public Result get(byte[] regionName, Get get) throws IOException;
public Result[] get(byte[] regionName, List<Get> gets)
throws IOException;
@@ -212,7 +214,7 @@ public interface HRegionInterface extend
* @throws IOException e
*/
public int delete(final byte[] regionName, final List<Delete> deletes)
- throws IOException;
+ throws IOException;
/**
* Atomically checks if a row/family/qualifier value match the expectedValue.
@@ -228,10 +230,9 @@ public interface HRegionInterface extend
* @throws IOException e
* @return true if the new put was execute, false otherwise
*/
- public boolean checkAndPut(final byte[] regionName, final byte [] row,
- final byte [] family, final byte [] qualifier, final byte [] value,
- final Put put)
- throws IOException;
+ public boolean checkAndPut(final byte[] regionName, final byte[] row,
+ final byte[] family, final byte[] qualifier, final byte[] value,
+ final Put put) throws IOException;
/**
@@ -248,10 +249,9 @@ public interface HRegionInterface extend
* @throws IOException e
* @return true if the new delete was execute, false otherwise
*/
- public boolean checkAndDelete(final byte[] regionName, final byte [] row,
- final byte [] family, final byte [] qualifier, final byte [] value,
- final Delete delete)
- throws IOException;
+ public boolean checkAndDelete(final byte[] regionName, final byte[] row,
+ final byte[] family, final byte[] qualifier, final byte[] value,
+ final Delete delete) throws IOException;
/**
* Atomically increments a column value. If the column value isn't long-like,
@@ -267,9 +267,9 @@ public interface HRegionInterface extend
* @return new incremented column value
* @throws IOException e
*/
- public long incrementColumnValue(byte [] regionName, byte [] row,
- byte [] family, byte [] qualifier, long amount, boolean writeToWAL)
- throws IOException;
+ public long incrementColumnValue(byte[] regionName, byte[] row,
+ byte[] family, byte[] qualifier, long amount, boolean writeToWAL)
+ throws IOException;
/**
* Get a configuration property from an HRegion
@@ -292,21 +292,21 @@ public interface HRegionInterface extend
* @return scannerId scanner identifier used in other calls
* @throws IOException e
*/
- public long openScanner(final byte [] regionName, final Scan scan)
- throws IOException;
+ public long openScanner(final byte[] regionName, final Scan scan)
+ throws IOException;
public void mutateRow(byte[] regionName, RowMutations arm)
throws IOException;
public void mutateRow(byte[] regionName, List<RowMutations> armList)
throws IOException;
-
/**
- * Get the next set of values
+ * Get the next set of values. Do not use with thrift
* @param scannerId clientId passed to openScanner
* @return map of values; returns null if no results.
* @throws IOException e
*/
+ @Deprecated
public Result next(long scannerId) throws IOException;
/**
@@ -318,7 +318,8 @@ public interface HRegionInterface extend
* filter rules that the scan is done).
* @throws IOException e
*/
- public Result [] next(long scannerId, int numberOfRows) throws IOException;
+ public Result[] next(long scannerId, int numberOfRows)
+ throws IOException;
/**
* Close a scanner
@@ -336,8 +337,8 @@ public interface HRegionInterface extend
* @return lockId lock identifier
* @throws IOException e
*/
- public long lockRow(final byte [] regionName, final byte [] row)
- throws IOException;
+ public long lockRow(final byte[] regionName, final byte[] row)
+ throws IOException;
/**
* Releases a remote row lock.
@@ -346,8 +347,8 @@ public interface HRegionInterface extend
* @param lockId the lock id returned by lockRow
* @throws IOException e
*/
- public void unlockRow(final byte [] regionName, final long lockId)
- throws IOException;
+ public void unlockRow(final byte[] regionName, final long lockId)
+ throws IOException;
/**
@@ -400,16 +401,16 @@ public interface HRegionInterface extend
/**
* Update the assignment plan for each region server.
- * @param updatedFavoredNodesMap
+ * @param plan
*/
public int updateFavoredNodes(AssignmentPlan plan)
- throws IOException;
+ throws IOException;
/**
* Update the configuration.
*/
- public void updateConfiguration() throws IOException;
-
+ public void updateConfiguration();
+
/**
* Stop this service.
* @param why Why we're stopping.
@@ -417,14 +418,12 @@ public interface HRegionInterface extend
public void stop(String why);
/** @return why we are stopping */
- String getStopReason();
-
+ public String getStopReason();
/**
* Set the number of threads to be used for HDFS Quorum reads
*
- * @param maxThreads. quourm reads will be disabled if set to <= 0
- *
+ * @param maxThreads quourum reads will be disabled if set to <= 0
*/
public void setNumHDFSQuorumReadThreads(int maxThreads);
@@ -432,8 +431,7 @@ public interface HRegionInterface extend
* Set the amount of time we wait before initiating a second read when
* using HDFS Quorum reads
*
- * @param timeoutMillis.
- *
+ * @param timeoutMillis
*/
public void setHDFSQuorumReadTimeoutMillis(long timeoutMillis);
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/ProfilingData.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/ProfilingData.java?rev=1576909&r1=1576908&r2=1576909&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/ProfilingData.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/ProfilingData.java Wed Mar 12 21:17:13 2014
@@ -3,23 +3,28 @@ package org.apache.hadoop.hbase.ipc;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import org.apache.commons.lang.mutable.MutableFloat;
import org.apache.commons.lang.mutable.MutableInt;
import org.apache.commons.lang.mutable.MutableLong;
-import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.hfile.BlockType;
import org.apache.hadoop.io.Writable;
+import com.facebook.swift.codec.ThriftConstructor;
+import com.facebook.swift.codec.ThriftField;
+import com.facebook.swift.codec.ThriftStruct;
+
/**
* A map containing profiling data, mapping String to
* String, Long, Int, Boolean, and Float. This class is
* not thread-safe.
*/
-
+@ThriftStruct
public class ProfilingData implements Writable {
/**
@@ -104,6 +109,35 @@ public class ProfilingData implements Wr
public ProfilingData() {}
+ @ThriftConstructor
+ public ProfilingData(
+ @ThriftField(1) Map<String, String> mapString,
+ @ThriftField(2) Map<String, Long> mapLong,
+ @ThriftField(3) Map<String, Integer> mapInt,
+ @ThriftField(4) Map<String, Boolean> mapBoolean,
+ @ThriftField(5) Map<String, Float> mapFloat,
+ @ThriftField(6) Map<String, List<Integer>> mapHist) {
+ this.mapString = mapString;
+ this.mapLong = new HashMap<>();
+ for (Entry<String, Long> e : mapLong.entrySet()) {
+ this.mapLong.put(e.getKey(), new MutableLong(e.getValue()));
+ }
+ for (Entry<String, Integer> e : mapInt.entrySet()) {
+ this.mapInt.put(e.getKey(), new MutableInt(e.getValue()));
+ }
+ this.mapBoolean = mapBoolean;
+ for (Entry<String, Float> e : mapFloat.entrySet()) {
+ this.mapFloat.put(e.getKey(), new MutableFloat(e.getValue()));
+ }
+ for (Entry<String, List<Integer>> e : mapHist.entrySet()) {
+ int[] intArray = new int[e.getValue().size()];
+ for (int i = 0; i< e.getValue().size(); i++) {
+ intArray[i] = e.getValue().get(i);
+ }
+ this.mapHist.put(e.getKey(), intArray);
+ }
+ }
+
public void addString(String key, String val) {
mapString.put(key, val);
}
@@ -414,6 +448,56 @@ public class ProfilingData implements Wr
return sb.toString();
}
+ @ThriftField(1)
+ public Map<String, String> getMapString() {
+ return mapString;
+ }
+
+ @ThriftField(2)
+ public Map<String, Long> getMapLong() {
+ Map<String, Long> map = new HashMap<>();
+ for (Entry<String, MutableLong> e : mapLong.entrySet()) {
+ map.put(e.getKey(), e.getValue().toLong());
+ }
+ return map;
+ }
+
+ @ThriftField(3)
+ public Map<String, Integer> getMapInt() {
+ Map<String, Integer> map = new HashMap<>();
+ for (Entry<String, MutableInt> e : mapInt.entrySet()) {
+ map.put(e.getKey(), e.getValue().toInteger());
+ }
+ return map;
+ }
+
+ @ThriftField(4)
+ public Map<String, Boolean> getMapBoolean() {
+ return mapBoolean;
+ }
+
+ @ThriftField(5)
+ public Map<String, Float> getMapFloat() {
+ Map<String, Float> map = new HashMap<>();
+ for (Entry<String, MutableFloat> e : mapFloat.entrySet()) {
+ map.put(e.getKey(), e.getValue().toFloat());
+ }
+ return map;
+ }
+
+ @ThriftField(6)
+ public Map<String, List<Integer>> getMapHist() {
+ Map<String, List<Integer>> map = new HashMap<>();
+ for (Entry<String, int[]> e : mapHist.entrySet()) {
+ List<Integer> list = new ArrayList<>();
+ for (int i : e.getValue()) {
+ list.add(i);
+ }
+ map.put(e.getKey(), list);
+ }
+ return map;
+ }
+
@Override
public String toString() {
return this.toString(", ");