You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2010/04/03 10:02:11 UTC
svn commit: r930491 - in /hadoop/hbase/trunk: ./
contrib/stargate/core/src/main/java/org/apache/hadoop/hbase/stargate/
contrib/stargate/core/src/main/java/org/apache/hadoop/hbase/stargate/client/
contrib/stargate/core/src/main/java/org/apache/hadoop/hb...
Author: apurtell
Date: Sat Apr 3 08:02:11 2010
New Revision: 930491
URL: http://svn.apache.org/viewvc?rev=930491&view=rev
Log:
HBASE-2403 [stargate] client HTable interface to REST connector
Added:
hadoop/hbase/trunk/contrib/stargate/core/src/main/java/org/apache/hadoop/hbase/stargate/client/RemoteHTable.java
hadoop/hbase/trunk/contrib/stargate/core/src/test/java/org/apache/hadoop/hbase/stargate/client/
hadoop/hbase/trunk/contrib/stargate/core/src/test/java/org/apache/hadoop/hbase/stargate/client/TestRemoteTable.java
Modified:
hadoop/hbase/trunk/CHANGES.txt
hadoop/hbase/trunk/contrib/stargate/core/src/main/java/org/apache/hadoop/hbase/stargate/RowResource.java
hadoop/hbase/trunk/contrib/stargate/core/src/main/java/org/apache/hadoop/hbase/stargate/RowSpec.java
hadoop/hbase/trunk/contrib/stargate/core/src/main/java/org/apache/hadoop/hbase/stargate/ScannerInstanceResource.java
hadoop/hbase/trunk/contrib/stargate/core/src/main/java/org/apache/hadoop/hbase/stargate/model/ScannerModel.java
Modified: hadoop/hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/CHANGES.txt?rev=930491&r1=930490&r2=930491&view=diff
==============================================================================
--- hadoop/hbase/trunk/CHANGES.txt (original)
+++ hadoop/hbase/trunk/CHANGES.txt Sat Apr 3 08:02:11 2010
@@ -479,6 +479,7 @@ Release 0.21.0 - Unreleased
HBASE-2388 Give a very explicit message when we figure a big GC pause
HBASE-2270 Improve how we handle recursive calls in ExplicitColumnTracker
and WildcardColumnTracker
+ HBASE-2402 [stargate] set maxVersions on gets
HBASE-2087 The wait on compaction because "Too many store files"
holds up all flushing
@@ -509,7 +510,7 @@ Release 0.21.0 - Unreleased
(Alexey Kovyrin via Stack)
HBASE-2327 [EC2] Allocate elastic IP addresses for ZK and master nodes
HBASE-2319 [stargate] multiuser mode: request shaping
- HBASE-2402 [stargate] set maxVersions on gets
+ HBASE-2403 [stargate] client HTable interface to REST connector
OPTIMIZATIONS
HBASE-410 [testing] Speed up the test suite
Modified: hadoop/hbase/trunk/contrib/stargate/core/src/main/java/org/apache/hadoop/hbase/stargate/RowResource.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/contrib/stargate/core/src/main/java/org/apache/hadoop/hbase/stargate/RowResource.java?rev=930491&r1=930490&r2=930491&view=diff
==============================================================================
--- hadoop/hbase/trunk/contrib/stargate/core/src/main/java/org/apache/hadoop/hbase/stargate/RowResource.java (original)
+++ hadoop/hbase/trunk/contrib/stargate/core/src/main/java/org/apache/hadoop/hbase/stargate/RowResource.java Sat Apr 3 08:02:11 2010
@@ -96,6 +96,7 @@ public class RowResource implements Cons
if (!generator.hasNext()) {
throw new WebApplicationException(Response.Status.NOT_FOUND);
}
+ int count = 0;
CellSetModel model = new CellSetModel();
KeyValue value = generator.next();
byte[] rowKey = value.getRow();
@@ -109,6 +110,9 @@ public class RowResource implements Cons
rowModel.addCell(
new CellModel(value.getFamily(), value.getQualifier(),
value.getTimestamp(), value.getValue()));
+ if (++count > rowspec.getMaxValues()) {
+ break;
+ }
value = generator.next();
} while (value != null);
model.addRow(rowModel);
Modified: hadoop/hbase/trunk/contrib/stargate/core/src/main/java/org/apache/hadoop/hbase/stargate/RowSpec.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/contrib/stargate/core/src/main/java/org/apache/hadoop/hbase/stargate/RowSpec.java?rev=930491&r1=930490&r2=930491&view=diff
==============================================================================
--- hadoop/hbase/trunk/contrib/stargate/core/src/main/java/org/apache/hadoop/hbase/stargate/RowSpec.java (original)
+++ hadoop/hbase/trunk/contrib/stargate/core/src/main/java/org/apache/hadoop/hbase/stargate/RowSpec.java Sat Apr 3 08:02:11 2010
@@ -37,8 +37,6 @@ public class RowSpec {
public static final long DEFAULT_START_TIMESTAMP = 0;
public static final long DEFAULT_END_TIMESTAMP = Long.MAX_VALUE;
- private static final String versionPrefix = "?v=";
-
private byte[] row = HConstants.EMPTY_START_ROW;
private byte[] endRow = null;
private TreeSet<byte[]> columns =
@@ -46,6 +44,7 @@ public class RowSpec {
private long startTime = DEFAULT_START_TIMESTAMP;
private long endTime = DEFAULT_END_TIMESTAMP;
private int maxVersions = HColumnDescriptor.DEFAULT_VERSIONS;
+ private int maxValues = Integer.MAX_VALUE;
public RowSpec(String path) throws IllegalArgumentException {
int i = 0;
@@ -55,7 +54,7 @@ public class RowSpec {
i = parseRowKeys(path, i);
i = parseColumns(path, i);
i = parseTimestamp(path, i);
- i = parseMaxVersions(path, i);
+ i = parseQueryParams(path, i);
}
private int parseRowKeys(final String path, int i)
@@ -201,14 +200,54 @@ public class RowSpec {
return i;
}
- private int parseMaxVersions(final String path, int i) {
- if (i >= path.length()) {
- return i;
- }
- String s = path.substring(i);
- if (s.startsWith(versionPrefix)) {
- this.maxVersions = Integer.valueOf(s.substring(versionPrefix.length()));
- i += s.length();
+ private int parseQueryParams(final String path, int i) {
+ while (i < path.length()) {
+ char c = path.charAt(i);
+ if (c != '?' && c != '&') {
+ break;
+ }
+ if (++i > path.length()) {
+ break;
+ }
+ char what = path.charAt(i);
+ if (++i > path.length()) {
+ break;
+ }
+ c = path.charAt(i);
+ if (c != '=') {
+ throw new IllegalArgumentException("malformed query parameter");
+ }
+ if (++i > path.length()) {
+ break;
+ }
+ switch (what) {
+ case 'm': {
+ StringBuilder sb = new StringBuilder();
+ while (i <= path.length()) {
+ c = path.charAt(i);
+ if (c < '0' || c > '9') {
+ i--;
+ break;
+ }
+ sb.append(c);
+ }
+ maxVersions = Integer.valueOf(sb.toString());
+ } break;
+ case 'n': {
+ StringBuilder sb = new StringBuilder();
+ while (i <= path.length()) {
+ c = path.charAt(i);
+ if (c < '0' || c > '9') {
+ i--;
+ break;
+ }
+ sb.append(c);
+ }
+ maxValues = Integer.valueOf(sb.toString());
+ } break;
+ default:
+ throw new IllegalArgumentException("unknown parameter '" + c + "'");
+ }
}
return i;
}
@@ -251,6 +290,14 @@ public class RowSpec {
this.maxVersions = maxVersions;
}
+ public int getMaxValues() {
+ return maxValues;
+ }
+
+ public void setMaxValues(final int maxValues) {
+ this.maxValues = maxValues;
+ }
+
public boolean hasColumns() {
return !columns.isEmpty();
}
@@ -325,6 +372,8 @@ public class RowSpec {
result.append(Long.toString(endTime));
result.append(", maxVersions => ");
result.append(Integer.toString(maxVersions));
+ result.append(", maxValues => ");
+ result.append(Integer.toString(maxValues));
result.append("}");
return result.toString();
}
Modified: hadoop/hbase/trunk/contrib/stargate/core/src/main/java/org/apache/hadoop/hbase/stargate/ScannerInstanceResource.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/contrib/stargate/core/src/main/java/org/apache/hadoop/hbase/stargate/ScannerInstanceResource.java?rev=930491&r1=930490&r2=930491&view=diff
==============================================================================
--- hadoop/hbase/trunk/contrib/stargate/core/src/main/java/org/apache/hadoop/hbase/stargate/ScannerInstanceResource.java (original)
+++ hadoop/hbase/trunk/contrib/stargate/core/src/main/java/org/apache/hadoop/hbase/stargate/ScannerInstanceResource.java Sat Apr 3 08:02:11 2010
@@ -25,6 +25,7 @@ import java.io.IOException;
import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.CacheControl;
import javax.ws.rs.core.Context;
@@ -50,7 +51,7 @@ public class ScannerInstanceResource imp
User user;
ResultGenerator generator;
String id;
- int batch;
+ int batch = 1;
RESTServlet servlet;
CacheControl cacheControl;
@@ -68,7 +69,9 @@ public class ScannerInstanceResource imp
@GET
@Produces({MIMETYPE_XML, MIMETYPE_JSON, MIMETYPE_PROTOBUF})
- public Response get(final @Context UriInfo uriInfo) throws IOException {
+ public Response get(final @Context UriInfo uriInfo,
+ @QueryParam("n") int maxRows, final @QueryParam("c") int maxValues)
+ throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("GET " + uriInfo.getAbsolutePath());
}
@@ -76,7 +79,11 @@ public class ScannerInstanceResource imp
CellSetModel model = new CellSetModel();
RowModel rowModel = null;
byte[] rowKey = null;
- int count = batch;
+ int limit = batch;
+ if (maxValues > 0) {
+ limit = maxValues;
+ }
+ int count = limit;
do {
KeyValue value = null;
try {
@@ -89,7 +96,7 @@ public class ScannerInstanceResource imp
LOG.info("generator exhausted");
// respond with 204 (No Content) if an empty cell set would be
// returned
- if (count == batch) {
+ if (count == limit) {
return Response.noContent().build();
}
break;
@@ -105,6 +112,14 @@ public class ScannerInstanceResource imp
generator.putBack(value);
break;
}
+ // if maxRows was given as a query param, stop if we would exceed the
+ // specified number of rows
+ if (maxRows > 0) {
+ if (--maxRows == 0) {
+ generator.putBack(value);
+ break;
+ }
+ }
model.addRow(rowModel);
rowKey = value.getRow();
rowModel = new RowModel(rowKey);
Added: hadoop/hbase/trunk/contrib/stargate/core/src/main/java/org/apache/hadoop/hbase/stargate/client/RemoteHTable.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/contrib/stargate/core/src/main/java/org/apache/hadoop/hbase/stargate/client/RemoteHTable.java?rev=930491&view=auto
==============================================================================
--- hadoop/hbase/trunk/contrib/stargate/core/src/main/java/org/apache/hadoop/hbase/stargate/client/RemoteHTable.java (added)
+++ hadoop/hbase/trunk/contrib/stargate/core/src/main/java/org/apache/hadoop/hbase/stargate/client/RemoteHTable.java Sat Apr 3 08:02:11 2010
@@ -0,0 +1,549 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.stargate.client;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+import javax.xml.namespace.QName;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.StringUtils;
+
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.RowLock;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.TimeRange;
+import org.apache.hadoop.hbase.stargate.Constants;
+import org.apache.hadoop.hbase.stargate.model.CellModel;
+import org.apache.hadoop.hbase.stargate.model.CellSetModel;
+import org.apache.hadoop.hbase.stargate.model.ColumnSchemaModel;
+import org.apache.hadoop.hbase.stargate.model.RowModel;
+import org.apache.hadoop.hbase.stargate.model.ScannerModel;
+import org.apache.hadoop.hbase.stargate.model.TableSchemaModel;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * HTable interface to remote tables accessed via Stargate
+ */
+public class RemoteHTable implements HTableInterface {
+
+ private static final Log LOG = LogFactory.getLog(RemoteHTable.class);
+
+ Client client;
+ Configuration conf;
+ byte[] name;
+ String accessToken;
+
+ @SuppressWarnings("unchecked")
+ protected String buildRowSpec(final byte[] row, final Map familyMap,
+ final long startTime, final long endTime, final int maxVersions) {
+ StringBuffer sb = new StringBuffer();
+ sb.append('/');
+ if (accessToken != null) {
+ sb.append(accessToken);
+ sb.append('/');
+ }
+ sb.append(Bytes.toStringBinary(name));
+ sb.append('/');
+ sb.append(Bytes.toStringBinary(row));
+ Set families = familyMap.entrySet();
+ if (families != null) {
+ Iterator i = familyMap.entrySet().iterator();
+ if (i.hasNext()) {
+ sb.append('/');
+ }
+ while (i.hasNext()) {
+ Map.Entry e = (Map.Entry)i.next();
+ Collection quals = (Collection)e.getValue();
+ if (quals != null && !quals.isEmpty()) {
+ Iterator ii = quals.iterator();
+ while (ii.hasNext()) {
+ sb.append(Bytes.toStringBinary((byte[])e.getKey()));
+ sb.append(':');
+ Object o = ii.next();
+ // Puts use byte[] but Deletes use KeyValue
+ if (o instanceof byte[]) {
+ sb.append(Bytes.toStringBinary((byte[])o));
+ } else if (o instanceof KeyValue) {
+ sb.append(Bytes.toStringBinary(((KeyValue)o).getQualifier()));
+ } else {
+ throw new RuntimeException("object type not handled");
+ }
+ if (ii.hasNext()) {
+ sb.append(',');
+ }
+ }
+ } else {
+ sb.append(Bytes.toStringBinary((byte[])e.getKey()));
+ sb.append(':');
+ }
+ if (i.hasNext()) {
+ sb.append(',');
+ }
+ }
+ }
+ if (startTime != 0 && endTime != Long.MAX_VALUE) {
+ sb.append('/');
+ sb.append(startTime);
+ if (startTime != endTime) {
+ sb.append(',');
+ sb.append(endTime);
+ }
+ } else if (endTime != Long.MAX_VALUE) {
+ sb.append('/');
+ sb.append(endTime);
+ }
+ if (maxVersions > 1) {
+ sb.append("?v=");
+ sb.append(maxVersions);
+ }
+ return sb.toString();
+ }
+
+ protected Result[] buildResultFromModel(final CellSetModel model) {
+ List<Result> results = new ArrayList<Result>();
+ for (RowModel row: model.getRows()) {
+ List<KeyValue> kvs = new ArrayList<KeyValue>();
+ for (CellModel cell: row.getCells()) {
+ byte[][] split = KeyValue.parseColumn(cell.getColumn());
+ byte[] column = split[0];
+ byte[] qualifier = split.length > 1 ? split[1] : null;
+ kvs.add(new KeyValue(row.getKey(), column, qualifier,
+ cell.getTimestamp(), cell.getValue()));
+ }
+ results.add(new Result(kvs));
+ }
+ return results.toArray(new Result[results.size()]);
+ }
+
+ protected CellSetModel buildModelFromPut(Put put) {
+ RowModel row = new RowModel(put.getRow());
+ long ts = put.getTimeStamp();
+ for (List<KeyValue> kvs: put.getFamilyMap().values()) {
+ for (KeyValue kv: kvs) {
+ row.addCell(new CellModel(kv.getFamily(), kv.getQualifier(),
+ ts != HConstants.LATEST_TIMESTAMP ? ts : kv.getTimestamp(),
+ kv.getValue()));
+ }
+ }
+ CellSetModel model = new CellSetModel();
+ model.addRow(row);
+ return model;
+ }
+
+ /**
+ * Constructor
+ * @param client
+ * @param name
+ */
+ public RemoteHTable(Client client, String name) {
+ this(client, HBaseConfiguration.create(), Bytes.toBytes(name), null);
+ }
+
+ /**
+ * Constructor
+ * @param client
+ * @param name
+ * @param accessToken
+ */
+ public RemoteHTable(Client client, String name, String accessToken) {
+ this(client, HBaseConfiguration.create(), Bytes.toBytes(name), accessToken);
+ }
+
+ /**
+ * Constructor
+ * @param client
+ * @param conf
+ * @param name
+ * @param accessToken
+ */
+ public RemoteHTable(Client client, Configuration conf, String name,
+ String accessToken) {
+ this(client, conf, Bytes.toBytes(name), accessToken);
+ }
+
+ /**
+ * Constructor
+ * @param conf
+ */
+ public RemoteHTable(Client client, Configuration conf, byte[] name,
+ String accessToken) {
+ this.client = client;
+ this.conf = conf;
+ this.name = name;
+ this.accessToken = accessToken;
+ }
+
+ @Override
+ public byte[] getTableName() {
+ return name.clone();
+ }
+
+ @Override
+ public Configuration getConfiguration() {
+ return conf;
+ }
+
+ @Override
+ public HTableDescriptor getTableDescriptor() throws IOException {
+ StringBuilder sb = new StringBuilder();
+ sb.append('/');
+ if (accessToken != null) {
+ sb.append(accessToken);
+ sb.append('/');
+ }
+ sb.append(Bytes.toStringBinary(name));
+ sb.append('/');
+ sb.append("schema");
+ Response response = client.get(sb.toString(), Constants.MIMETYPE_PROTOBUF);
+ if (response.getCode() != 200) {
+ throw new IOException("schema request returned " + response.getCode());
+ }
+ TableSchemaModel schema = new TableSchemaModel();
+ schema.getObjectFromMessage(response.getBody());
+ HTableDescriptor htd = new HTableDescriptor(schema.getName());
+ for (Map.Entry<QName, Object> e: schema.getAny().entrySet()) {
+ htd.setValue(e.getKey().getLocalPart(), e.getValue().toString());
+ }
+ for (ColumnSchemaModel column: schema.getColumns()) {
+ HColumnDescriptor hcd = new HColumnDescriptor(column.getName());
+ for (Map.Entry<QName, Object> e: column.getAny().entrySet()) {
+ hcd.setValue(e.getKey().getLocalPart(), e.getValue().toString());
+ }
+ htd.addFamily(hcd);
+ }
+ return htd;
+ }
+
+ @Override
+ public void close() throws IOException {
+ client.shutdown();
+ }
+
+ @Override
+ public Result get(Get get) throws IOException {
+ TimeRange range = get.getTimeRange();
+ String spec = buildRowSpec(get.getRow(), get.getFamilyMap(),
+ range.getMin(), range.getMax(), get.getMaxVersions());
+ if (get.getFilter() != null) {
+ LOG.warn("filters not supported on gets");
+ }
+ Response response = client.get(spec, Constants.MIMETYPE_PROTOBUF);
+ int code = response.getCode();
+ if (code == 404) {
+ return new Result();
+ }
+ if (code != 200) {
+ throw new IOException("get request returned " + code);
+ }
+ CellSetModel model = new CellSetModel();
+ model.getObjectFromMessage(response.getBody());
+ Result[] results = buildResultFromModel(model);
+ if (results.length > 0) {
+ if (results.length > 1) {
+ LOG.warn("too many results for get (" + results.length + ")");
+ }
+ return results[0];
+ }
+ return new Result();
+ }
+
+ @Override
+ public boolean exists(Get get) throws IOException {
+ LOG.warn("exists() is really get(), just use get()");
+ Result result = get(get);
+ return (result != null && !(result.isEmpty()));
+ }
+
+ @Override
+ public void put(Put put) throws IOException {
+ CellSetModel model = buildModelFromPut(put);
+ StringBuilder sb = new StringBuilder();
+ sb.append('/');
+ if (accessToken != null) {
+ sb.append(accessToken);
+ sb.append('/');
+ }
+ sb.append(Bytes.toStringBinary(name));
+ sb.append('/');
+ sb.append(Bytes.toStringBinary(put.getRow()));
+ Response response = client.put(sb.toString(), Constants.MIMETYPE_PROTOBUF,
+ model.createProtobufOutput());
+ if (response.getCode() != 200) {
+ throw new IOException("put failed with " + response.getCode());
+ }
+ }
+
+ @Override
+ public void put(List<Put> puts) throws IOException {
+ // this is a trick: Stargate accepts multiple rows in a cell set and
+ // ignores the row specification in the URI
+
+ // separate puts by row
+ TreeMap<byte[],List<KeyValue>> map =
+ new TreeMap<byte[],List<KeyValue>>(Bytes.BYTES_COMPARATOR);
+ for (Put put: puts) {
+ byte[] row = put.getRow();
+ List<KeyValue> kvs = map.get(row);
+ if (kvs == null) {
+ kvs = new ArrayList<KeyValue>();
+ map.put(row, kvs);
+ }
+ for (List<KeyValue> l: put.getFamilyMap().values()) {
+ kvs.addAll(l);
+ }
+ }
+
+ // build the cell set
+ CellSetModel model = new CellSetModel();
+ for (Map.Entry<byte[], List<KeyValue>> e: map.entrySet()) {
+ RowModel row = new RowModel(e.getKey());
+ for (KeyValue kv: e.getValue()) {
+ row.addCell(new CellModel(kv));
+ }
+ model.addRow(row);
+ }
+
+ // build path for multiput
+ StringBuilder sb = new StringBuilder();
+ sb.append('/');
+ if (accessToken != null) {
+ sb.append(accessToken);
+ sb.append('/');
+ }
+ sb.append(Bytes.toStringBinary(name));
+ sb.append("/$multiput"); // can be any nonexistent row
+ Response response = client.put(sb.toString(), Constants.MIMETYPE_PROTOBUF,
+ model.createProtobufOutput());
+ if (response.getCode() != 200) {
+ throw new IOException("multiput failed with " + response.getCode());
+ }
+ }
+
+ @Override
+ public void delete(Delete delete) throws IOException {
+ String spec = buildRowSpec(delete.getRow(), delete.getFamilyMap(),
+ delete.getTimeStamp(), delete.getTimeStamp(), 1);
+ Response response = client.delete(spec);
+ if (response.getCode() != 200) {
+ throw new IOException("delete() returned " + response.getCode());
+ }
+ }
+
+ @Override
+ public void delete(List<Delete> deletes) throws IOException {
+ for (Delete delete: deletes) {
+ delete(delete);
+ }
+ }
+
+ @Override
+ public void flushCommits() throws IOException {
+ // no-op
+ }
+
+ class Scanner implements ResultScanner {
+
+ String uri;
+
+ public Scanner(Scan scan) throws IOException {
+ StringBuffer sb = new StringBuffer();
+ sb.append('/');
+ if (accessToken != null) {
+ sb.append(accessToken);
+ sb.append('/');
+ }
+ sb.append(Bytes.toStringBinary(name));
+ sb.append('/');
+ sb.append("scanner");
+ try {
+ ScannerModel model = ScannerModel.fromScan(scan);
+ Response response = client.post(sb.toString(),
+ Constants.MIMETYPE_PROTOBUF, model.createProtobufOutput());
+ if (response.getCode() != 201) {
+ throw new IOException("scan request failed with " +
+ response.getCode());
+ }
+ uri = response.getLocation();
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public Result[] next(int nbRows) throws IOException {
+ StringBuilder sb = new StringBuilder(uri);
+ sb.append("?n=");
+ sb.append(nbRows);
+ Response response = client.get(sb.toString(),
+ Constants.MIMETYPE_PROTOBUF);
+ if (response.getCode() == 206) {
+ return null;
+ }
+ if (response.getCode() != 200) {
+ LOG.error("scanner.next failed with " + response.getCode());
+ return null;
+ }
+ CellSetModel model = new CellSetModel();
+ model.getObjectFromMessage(response.getBody());
+ return buildResultFromModel(model);
+ }
+
+ @Override
+ public Result next() throws IOException {
+ Result[] results = next(1);
+ if (results == null || results.length < 1) {
+ return null;
+ }
+ return results[0];
+ }
+
+ class Iter implements Iterator<Result> {
+
+ Result cache;
+
+ public Iter() {
+ try {
+ cache = Scanner.this.next();
+ } catch (IOException e) {
+ LOG.warn(StringUtils.stringifyException(e));
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ return cache != null;
+ }
+
+ @Override
+ public Result next() {
+ Result result = cache;
+ try {
+ cache = Scanner.this.next();
+ } catch (IOException e) {
+ LOG.warn(StringUtils.stringifyException(e));
+ cache = null;
+ }
+ return result;
+ }
+
+ @Override
+ public void remove() {
+ throw new RuntimeException("remove() not supported");
+ }
+
+ }
+
+ @Override
+ public Iterator<Result> iterator() {
+ return new Iter();
+ }
+
+ @Override
+ public void close() {
+ try {
+ client.delete(uri);
+ } catch (IOException e) {
+ LOG.warn(StringUtils.stringifyException(e));
+ }
+ }
+
+ }
+
+ @Override
+ public ResultScanner getScanner(Scan scan) throws IOException {
+ return new Scanner(scan);
+ }
+
+ @Override
+ public ResultScanner getScanner(byte[] family) throws IOException {
+ Scan scan = new Scan();
+ scan.addFamily(family);
+ return new Scanner(scan);
+ }
+
+ @Override
+ public ResultScanner getScanner(byte[] family, byte[] qualifier)
+ throws IOException {
+ Scan scan = new Scan();
+ scan.addColumn(family, qualifier);
+ return new Scanner(scan);
+ }
+
+ @Override
+ public boolean isAutoFlush() {
+ return true;
+ }
+
+ @Override
+ public Result getRowOrBefore(byte[] row, byte[] family) throws IOException {
+ throw new IOException("getRowOrBefore not supported");
+ }
+
+ @Override
+ public RowLock lockRow(byte[] row) throws IOException {
+ throw new IOException("lockRow not implemented");
+ }
+
+ @Override
+ public void unlockRow(RowLock rl) throws IOException {
+ throw new IOException("unlockRow not implemented");
+ }
+
+ @Override
+ public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier,
+ byte[] value, Put put) throws IOException {
+ throw new IOException("checkAndPut not supported");
+ }
+
+ @Override
+ public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier,
+ long amount) throws IOException {
+ throw new IOException("incrementColumnValue not supported");
+ }
+
+ @Override
+ public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier,
+ long amount, boolean writeToWAL) throws IOException {
+ throw new IOException("incrementColumnValue not supported");
+ }
+
+}
Modified: hadoop/hbase/trunk/contrib/stargate/core/src/main/java/org/apache/hadoop/hbase/stargate/model/ScannerModel.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/contrib/stargate/core/src/main/java/org/apache/hadoop/hbase/stargate/model/ScannerModel.java?rev=930491&r1=930490&r2=930491&view=diff
==============================================================================
--- hadoop/hbase/trunk/contrib/stargate/core/src/main/java/org/apache/hadoop/hbase/stargate/model/ScannerModel.java (original)
+++ hadoop/hbase/trunk/contrib/stargate/core/src/main/java/org/apache/hadoop/hbase/stargate/model/ScannerModel.java Sat Apr 3 08:02:11 2010
@@ -110,11 +110,11 @@ public class ScannerModel implements Pro
private byte[] startRow = HConstants.EMPTY_START_ROW;
private byte[] endRow = HConstants.EMPTY_END_ROW;;
private List<byte[]> columns = new ArrayList<byte[]>();
- private int batch = 1;
+ private int batch = Integer.MAX_VALUE;
private long startTime = 0;
private long endTime = Long.MAX_VALUE;
- private String filter;
- private int maxVersions = 1;
+ private String filter = null;
+ private int maxVersions = Integer.MAX_VALUE;
/**
* @param o the JSONObject under construction
@@ -343,8 +343,14 @@ public class ScannerModel implements Pro
}
model.setStartTime(scan.getTimeRange().getMin());
model.setEndTime(scan.getTimeRange().getMax());
- model.setBatch(scan.getCaching());
- model.setMaxVersions(scan.getMaxVersions());
+ int caching = scan.getCaching();
+ if (caching > 0) {
+ model.setBatch(caching);
+ }
+ int maxVersions = scan.getMaxVersions();
+ if (maxVersions > 0) {
+ model.setMaxVersions(maxVersions);
+ }
Filter filter = scan.getFilter();
if (filter != null) {
model.setFilter(stringifyFilter(new JSONStringer(), filter).toString());
Added: hadoop/hbase/trunk/contrib/stargate/core/src/test/java/org/apache/hadoop/hbase/stargate/client/TestRemoteTable.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/contrib/stargate/core/src/test/java/org/apache/hadoop/hbase/stargate/client/TestRemoteTable.java?rev=930491&view=auto
==============================================================================
--- hadoop/hbase/trunk/contrib/stargate/core/src/test/java/org/apache/hadoop/hbase/stargate/client/TestRemoteTable.java (added)
+++ hadoop/hbase/trunk/contrib/stargate/core/src/test/java/org/apache/hadoop/hbase/stargate/client/TestRemoteTable.java Sat Apr 3 08:02:11 2010
@@ -0,0 +1,330 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.stargate.client;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.stargate.MiniClusterTestBase;
+import org.apache.hadoop.hbase.stargate.client.Client;
+import org.apache.hadoop.hbase.stargate.client.Cluster;
+import org.apache.hadoop.hbase.stargate.client.RemoteHTable;
+import org.apache.hadoop.hbase.util.Bytes;
+
+public class TestRemoteTable extends MiniClusterTestBase {
+
+ static final String TABLE = "TestRemoteTable";
+ static final byte[] ROW_1 = Bytes.toBytes("testrow1");
+ static final byte[] ROW_2 = Bytes.toBytes("testrow2");
+ static final byte[] ROW_3 = Bytes.toBytes("testrow3");
+ static final byte[] ROW_4 = Bytes.toBytes("testrow4");
+ static final byte[] COLUMN_1 = Bytes.toBytes("a");
+ static final byte[] COLUMN_2 = Bytes.toBytes("b");
+ static final byte[] COLUMN_3 = Bytes.toBytes("c");
+ static final byte[] QUALIFIER_1 = Bytes.toBytes("1");
+ static final byte[] QUALIFIER_2 = Bytes.toBytes("2");
+ static final byte[] QUALIFIER_3 = Bytes.toBytes("3");
+ static final byte[] VALUE_1 = Bytes.toBytes("testvalue1");
+ static final byte[] VALUE_2 = Bytes.toBytes("testvalue2");
+ static final byte[] VALUE_3 = Bytes.toBytes("testvalue3");
+
+ static final long ONE_HOUR = 60 * 60 * 1000;
+ static final long TS_2 = System.currentTimeMillis();
+ static final long TS_1 = TS_2 - ONE_HOUR;
+
+ Client client;
+ HBaseAdmin admin;
+ RemoteHTable remoteTable;
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ admin = new HBaseAdmin(conf);
+ if (!admin.tableExists(TABLE)) {
+ HTableDescriptor htd = new HTableDescriptor(TABLE);
+ htd.addFamily(new HColumnDescriptor(COLUMN_1));
+ htd.addFamily(new HColumnDescriptor(COLUMN_2));
+ htd.addFamily(new HColumnDescriptor(COLUMN_3));
+ admin.createTable(htd);
+ HTable table = new HTable(TABLE);
+ Put put = new Put(ROW_1);
+ put.add(COLUMN_1, QUALIFIER_1, TS_2, VALUE_1);
+ table.put(put);
+ put = new Put(ROW_2);
+ put.add(COLUMN_1, QUALIFIER_1, TS_1, VALUE_1);
+ put.add(COLUMN_1, QUALIFIER_1, TS_2, VALUE_2);
+ put.add(COLUMN_2, QUALIFIER_2, TS_2, VALUE_2);
+ table.put(put);
+ table.flushCommits();
+ }
+ remoteTable = new RemoteHTable(
+ new Client(new Cluster().add("localhost", testServletPort)),
+ conf, TABLE, null);
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ remoteTable.close();
+ super.tearDown();
+ }
+
+ public void testGetTableDescriptor() throws IOException {
+ HTableDescriptor local = new HTable(conf, TABLE).getTableDescriptor();
+ assertEquals(remoteTable.getTableDescriptor(), local);
+ }
+
+ public void testGet() throws IOException {
+ Get get = new Get(ROW_1);
+ Result result = remoteTable.get(get);
+ byte[] value1 = result.getValue(COLUMN_1, QUALIFIER_1);
+ byte[] value2 = result.getValue(COLUMN_2, QUALIFIER_2);
+ assertNotNull(value1);
+ assertTrue(Bytes.equals(VALUE_1, value1));
+ assertNull(value2);
+
+ get = new Get(ROW_1);
+ get.addFamily(COLUMN_3);
+ result = remoteTable.get(get);
+ value1 = result.getValue(COLUMN_1, QUALIFIER_1);
+ value2 = result.getValue(COLUMN_2, QUALIFIER_2);
+ assertNull(value1);
+ assertNull(value2);
+
+ get = new Get(ROW_1);
+ get.addColumn(COLUMN_1, QUALIFIER_1);
+ get.addColumn(COLUMN_2, QUALIFIER_2);
+ result = remoteTable.get(get);
+ value1 = result.getValue(COLUMN_1, QUALIFIER_1);
+ value2 = result.getValue(COLUMN_2, QUALIFIER_2);
+ assertNotNull(value1);
+ assertTrue(Bytes.equals(VALUE_1, value1));
+ assertNull(value2);
+
+ get = new Get(ROW_2);
+ result = remoteTable.get(get);
+ value1 = result.getValue(COLUMN_1, QUALIFIER_1);
+ value2 = result.getValue(COLUMN_2, QUALIFIER_2);
+ assertNotNull(value1);
+ assertTrue(Bytes.equals(VALUE_2, value1)); // @TS_2
+ assertNotNull(value2);
+ assertTrue(Bytes.equals(VALUE_2, value2));
+
+ get = new Get(ROW_2);
+ get.addFamily(COLUMN_1);
+ result = remoteTable.get(get);
+ value1 = result.getValue(COLUMN_1, QUALIFIER_1);
+ value2 = result.getValue(COLUMN_2, QUALIFIER_2);
+ assertNotNull(value1);
+ assertTrue(Bytes.equals(VALUE_2, value1)); // @TS_2
+ assertNull(value2);
+
+ get = new Get(ROW_2);
+ get.addColumn(COLUMN_1, QUALIFIER_1);
+ get.addColumn(COLUMN_2, QUALIFIER_2);
+ result = remoteTable.get(get);
+ value1 = result.getValue(COLUMN_1, QUALIFIER_1);
+ value2 = result.getValue(COLUMN_2, QUALIFIER_2);
+ assertNotNull(value1);
+ assertTrue(Bytes.equals(VALUE_2, value1)); // @TS_2
+ assertNotNull(value2);
+ assertTrue(Bytes.equals(VALUE_2, value2));
+
+ // test timestamp
+
+ get = new Get(ROW_2);
+ get.addFamily(COLUMN_1);
+ get.addFamily(COLUMN_2);
+ get.setTimeStamp(TS_1);
+ result = remoteTable.get(get);
+ value1 = result.getValue(COLUMN_1, QUALIFIER_1);
+ value2 = result.getValue(COLUMN_2, QUALIFIER_2);
+ assertNotNull(value1);
+ assertTrue(Bytes.equals(VALUE_1, value1)); // @TS_1
+ assertNull(value2);
+
+ // test timerange
+
+ get = new Get(ROW_2);
+ get.addFamily(COLUMN_1);
+ get.addFamily(COLUMN_2);
+ get.setTimeRange(0, TS_1 + 1);
+ result = remoteTable.get(get);
+ value1 = result.getValue(COLUMN_1, QUALIFIER_1);
+ value2 = result.getValue(COLUMN_2, QUALIFIER_2);
+ assertNotNull(value1);
+ assertTrue(Bytes.equals(VALUE_1, value1)); // @TS_1
+ assertNull(value2);
+
+ // test maxVersions
+
+ get = new Get(ROW_2);
+ get.addFamily(COLUMN_1);
+ get.setMaxVersions(2);
+ result = remoteTable.get(get);
+ int count = 0;
+ for (KeyValue kv: result.list()) {
+ if (Bytes.equals(COLUMN_1, kv.getFamily()) && TS_1 == kv.getTimestamp()) {
+ assertTrue(Bytes.equals(VALUE_1, kv.getValue())); // @TS_1
+ count++;
+ }
+ if (Bytes.equals(COLUMN_1, kv.getFamily()) && TS_2 == kv.getTimestamp()) {
+ assertTrue(Bytes.equals(VALUE_2, kv.getValue())); // @TS_2
+ count++;
+ }
+ }
+ assertEquals(2, count);
+ }
+
+ public void testPut() throws IOException {
+ Put put = new Put(ROW_3);
+ put.add(COLUMN_1, QUALIFIER_1, VALUE_1);
+ remoteTable.put(put);
+
+ Get get = new Get(ROW_3);
+ get.addFamily(COLUMN_1);
+ Result result = remoteTable.get(get);
+ byte[] value = result.getValue(COLUMN_1, QUALIFIER_1);
+ assertNotNull(value);
+ assertTrue(Bytes.equals(VALUE_1, value));
+
+ // multiput
+
+ List<Put> puts = new ArrayList<Put>();
+ put = new Put(ROW_3);
+ put.add(COLUMN_2, QUALIFIER_2, VALUE_2);
+ puts.add(put);
+ put = new Put(ROW_4);
+ put.add(COLUMN_1, QUALIFIER_1, VALUE_1);
+ puts.add(put);
+ put = new Put(ROW_4);
+ put.add(COLUMN_2, QUALIFIER_2, VALUE_2);
+ puts.add(put);
+ remoteTable.put(puts);
+
+ get = new Get(ROW_3);
+ get.addFamily(COLUMN_2);
+ result = remoteTable.get(get);
+ value = result.getValue(COLUMN_2, QUALIFIER_2);
+ assertNotNull(value);
+ assertTrue(Bytes.equals(VALUE_2, value));
+ get = new Get(ROW_4);
+ result = remoteTable.get(get);
+ value = result.getValue(COLUMN_1, QUALIFIER_1);
+ assertNotNull(value);
+ assertTrue(Bytes.equals(VALUE_1, value));
+ value = result.getValue(COLUMN_2, QUALIFIER_2);
+ assertNotNull(value);
+ assertTrue(Bytes.equals(VALUE_2, value));
+ }
+
+ public void testDelete() throws IOException {
+ Put put = new Put(ROW_3);
+ put.add(COLUMN_1, QUALIFIER_1, VALUE_1);
+ put.add(COLUMN_2, QUALIFIER_2, VALUE_2);
+ remoteTable.put(put);
+
+ Get get = new Get(ROW_3);
+ get.addFamily(COLUMN_1);
+ get.addFamily(COLUMN_2);
+ Result result = remoteTable.get(get);
+ byte[] value1 = result.getValue(COLUMN_1, QUALIFIER_1);
+ byte[] value2 = result.getValue(COLUMN_2, QUALIFIER_2);
+ assertNotNull(value1);
+ assertTrue(Bytes.equals(VALUE_1, value1));
+ assertNotNull(value2);
+ assertTrue(Bytes.equals(VALUE_2, value2));
+
+ Delete delete = new Delete(ROW_3);
+ delete.deleteColumn(COLUMN_2, QUALIFIER_2);
+ remoteTable.delete(delete);
+
+ get = new Get(ROW_3);
+ get.addFamily(COLUMN_1);
+ get.addFamily(COLUMN_2);
+ result = remoteTable.get(get);
+ value1 = result.getValue(COLUMN_1, QUALIFIER_1);
+ value2 = result.getValue(COLUMN_2, QUALIFIER_2);
+ assertNotNull(value1);
+ assertTrue(Bytes.equals(VALUE_1, value1));
+ assertNull(value2);
+
+ delete = new Delete(ROW_3);
+ remoteTable.delete(delete);
+
+ get = new Get(ROW_3);
+ get.addFamily(COLUMN_1);
+ get.addFamily(COLUMN_2);
+ result = remoteTable.get(get);
+ value1 = result.getValue(COLUMN_1, QUALIFIER_1);
+ value2 = result.getValue(COLUMN_2, QUALIFIER_2);
+ assertNull(value1);
+ assertNull(value2);
+ }
+
+ public void testScanner() throws IOException {
+ List<Put> puts = new ArrayList<Put>();
+ Put put = new Put(ROW_1);
+ put.add(COLUMN_1, QUALIFIER_1, VALUE_1);
+ puts.add(put);
+ put = new Put(ROW_2);
+ put.add(COLUMN_1, QUALIFIER_1, VALUE_1);
+ puts.add(put);
+ put = new Put(ROW_3);
+ put.add(COLUMN_1, QUALIFIER_1, VALUE_1);
+ puts.add(put);
+ put = new Put(ROW_4);
+ put.add(COLUMN_1, QUALIFIER_1, VALUE_1);
+ puts.add(put);
+ remoteTable.put(puts);
+
+ ResultScanner scanner = remoteTable.getScanner(new Scan());
+
+ Result[] results = scanner.next(1);
+ assertNotNull(results);
+ assertEquals(1, results.length);
+ assertTrue(Bytes.equals(ROW_1, results[0].getRow()));
+
+ results = scanner.next(3);
+ assertNotNull(results);
+ assertEquals(3, results.length);
+ assertTrue(Bytes.equals(ROW_2, results[0].getRow()));
+ assertTrue(Bytes.equals(ROW_3, results[1].getRow()));
+ assertTrue(Bytes.equals(ROW_4, results[2].getRow()));
+
+ results = scanner.next(1);
+ assertNull(results);
+
+ scanner.close();
+ }
+
+}