You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by nd...@apache.org on 2014/01/17 03:33:16 UTC
svn commit: r1558995 - in /hbase/branches/0.98:
hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/rest/
hbase-hadoop1-compat/src/main/java/org/apache/hadoop/hbase/rest/
hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/rest/
hbase-serv...
Author: ndimiduk
Date: Fri Jan 17 02:33:16 2014
New Revision: 1558995
URL: http://svn.apache.org/r1558995
Log:
HBASE-9343 Implement stateless scanner for Stargate (Vandana Ayyalasomayajula)
Added:
hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/ProtobufStreamingUtil.java
hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/TableScanResource.java
hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/rest/TestTableScan.java
Modified:
hbase/branches/0.98/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/rest/MetricsRESTSource.java
hbase/branches/0.98/hbase-hadoop1-compat/src/main/java/org/apache/hadoop/hbase/rest/MetricsRESTSourceImpl.java
hbase/branches/0.98/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/rest/MetricsRESTSourceImpl.java
hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/Constants.java
hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/MetricsREST.java
hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/TableResource.java
hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/client/Client.java
hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/client/Response.java
hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/rest/TestGetAndPutResource.java
hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/rest/TestScannerResource.java
Modified: hbase/branches/0.98/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/rest/MetricsRESTSource.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/rest/MetricsRESTSource.java?rev=1558995&r1=1558994&r2=1558995&view=diff
==============================================================================
--- hbase/branches/0.98/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/rest/MetricsRESTSource.java (original)
+++ hbase/branches/0.98/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/rest/MetricsRESTSource.java Fri Jan 17 02:33:16 2014
@@ -46,6 +46,10 @@ public interface MetricsRESTSource exten
String FAILED_PUT_KEY = "failedPut";
String FAILED_DELETE_KEY = "failedDelete";
+
+ String SUCCESSFUL_SCAN_KEY = "successfulScanCount";
+
+ String FAILED_SCAN_KEY = "failedScanCount";
/**
* Increment the number of requests
@@ -95,4 +99,18 @@ public interface MetricsRESTSource exten
* @param inc The number of failed delete requests.
*/
void incrementFailedDeleteRequests(int inc);
+
+ /**
+ * Increment the number of successful scan requests.
+ *
+ * @param inc Number of successful scan requests.
+ */
+ void incrementSucessfulScanRequests(final int inc);
+
+ /**
+ * Increment the number failed scan requests.
+ *
+ * @param inc the inc
+ */
+ void incrementFailedScanRequests(final int inc);
}
Modified: hbase/branches/0.98/hbase-hadoop1-compat/src/main/java/org/apache/hadoop/hbase/rest/MetricsRESTSourceImpl.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-hadoop1-compat/src/main/java/org/apache/hadoop/hbase/rest/MetricsRESTSourceImpl.java?rev=1558995&r1=1558994&r2=1558995&view=diff
==============================================================================
--- hbase/branches/0.98/hbase-hadoop1-compat/src/main/java/org/apache/hadoop/hbase/rest/MetricsRESTSourceImpl.java (original)
+++ hbase/branches/0.98/hbase-hadoop1-compat/src/main/java/org/apache/hadoop/hbase/rest/MetricsRESTSourceImpl.java Fri Jan 17 02:33:16 2014
@@ -33,9 +33,11 @@ public class MetricsRESTSourceImpl exten
private MetricMutableCounterLong sucGet;
private MetricMutableCounterLong sucPut;
private MetricMutableCounterLong sucDel;
+ private MetricMutableCounterLong sucScan;
private MetricMutableCounterLong fGet;
private MetricMutableCounterLong fPut;
private MetricMutableCounterLong fDel;
+ private MetricMutableCounterLong fScan;
public MetricsRESTSourceImpl() {
this(METRICS_NAME, METRICS_DESCRIPTION, CONTEXT, JMX_CONTEXT);
@@ -56,10 +58,12 @@ public class MetricsRESTSourceImpl exten
sucGet = getMetricsRegistry().getLongCounter(SUCCESSFUL_GET_KEY, 0l);
sucPut = getMetricsRegistry().getLongCounter(SUCCESSFUL_PUT_KEY, 0l);
sucDel = getMetricsRegistry().getLongCounter(SUCCESSFUL_DELETE_KEY, 0l);
+ sucScan = getMetricsRegistry().getLongCounter(SUCCESSFUL_SCAN_KEY, 0L);
fGet = getMetricsRegistry().getLongCounter(FAILED_GET_KEY, 0l);
fPut = getMetricsRegistry().getLongCounter(FAILED_PUT_KEY, 0l);
fDel = getMetricsRegistry().getLongCounter(FAILED_DELETE_KEY, 0l);
+ fScan = getMetricsRegistry().getLongCounter(FAILED_SCAN_KEY, 0l);
}
@Override
@@ -96,4 +100,14 @@ public class MetricsRESTSourceImpl exten
public void incrementFailedDeleteRequests(int inc) {
fDel.incr(inc);
}
+
+ @Override
+ public void incrementSucessfulScanRequests(int inc) {
+ sucScan.incr(inc);
+ }
+
+ @Override
+ public void incrementFailedScanRequests(int inc) {
+ fScan.incr(inc);
+ }
}
Modified: hbase/branches/0.98/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/rest/MetricsRESTSourceImpl.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/rest/MetricsRESTSourceImpl.java?rev=1558995&r1=1558994&r2=1558995&view=diff
==============================================================================
--- hbase/branches/0.98/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/rest/MetricsRESTSourceImpl.java (original)
+++ hbase/branches/0.98/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/rest/MetricsRESTSourceImpl.java Fri Jan 17 02:33:16 2014
@@ -35,9 +35,11 @@ public class MetricsRESTSourceImpl exten
private MutableCounterLong sucGet;
private MutableCounterLong sucPut;
private MutableCounterLong sucDel;
+ private MutableCounterLong sucScan;
private MutableCounterLong fGet;
private MutableCounterLong fPut;
private MutableCounterLong fDel;
+ private MutableCounterLong fScan;
public MetricsRESTSourceImpl() {
this(METRICS_NAME, METRICS_DESCRIPTION, CONTEXT, JMX_CONTEXT);
@@ -58,10 +60,12 @@ public class MetricsRESTSourceImpl exten
sucGet = getMetricsRegistry().getLongCounter(SUCCESSFUL_GET_KEY, 0l);
sucPut = getMetricsRegistry().getLongCounter(SUCCESSFUL_PUT_KEY, 0l);
sucDel = getMetricsRegistry().getLongCounter(SUCCESSFUL_DELETE_KEY, 0l);
+ sucScan = getMetricsRegistry().getLongCounter(SUCCESSFUL_SCAN_KEY, 0L);
fGet = getMetricsRegistry().getLongCounter(FAILED_GET_KEY, 0l);
fPut = getMetricsRegistry().getLongCounter(FAILED_PUT_KEY, 0l);
fDel = getMetricsRegistry().getLongCounter(FAILED_DELETE_KEY, 0l);
+ fScan = getMetricsRegistry().getLongCounter(FAILED_SCAN_KEY, 0l);
}
@Override
@@ -98,4 +102,14 @@ public class MetricsRESTSourceImpl exten
public void incrementFailedDeleteRequests(int inc) {
fDel.incr(inc);
}
+
+ @Override
+ public void incrementSucessfulScanRequests(int inc) {
+ sucScan.incr(inc);
+ }
+
+ @Override
+ public void incrementFailedScanRequests(int inc) {
+ fScan.incr(inc);
+ }
}
Modified: hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/Constants.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/Constants.java?rev=1558995&r1=1558994&r2=1558995&view=diff
==============================================================================
--- hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/Constants.java (original)
+++ hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/Constants.java Fri Jan 17 02:33:16 2014
@@ -57,5 +57,16 @@ public interface Constants {
static final String REST_DNS_NAMESERVER = "hbase.rest.dns.nameserver";
static final String REST_DNS_INTERFACE = "hbase.rest.dns.interface";
+
public static final String FILTER_CLASSES = "hbase.rest.filter.classes";
+ public static final String SCAN_START_ROW = "startrow";
+ public static final String SCAN_END_ROW = "endrow";
+ public static final String SCAN_COLUMN = "column";
+ public static final String SCAN_START_TIME = "starttime";
+ public static final String SCAN_END_TIME = "endtime";
+ public static final String SCAN_MAX_VERSIONS = "maxversions";
+ public static final String SCAN_BATCH_SIZE = "batchsize";
+ public static final String SCAN_LIMIT = "limit";
+ public static final String SCAN_FETCH_SIZE = "hbase.rest.scan.fetchsize";
+
}
Modified: hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/MetricsREST.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/MetricsREST.java?rev=1558995&r1=1558994&r2=1558995&view=diff
==============================================================================
--- hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/MetricsREST.java (original)
+++ hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/MetricsREST.java Fri Jan 17 02:33:16 2014
@@ -78,12 +78,26 @@ public class MetricsREST {
public void incrementSucessfulDeleteRequests(final int inc) {
source.incrementSucessfulDeleteRequests(inc);
}
-
+
/**
* @param inc How much to add to failedDeleteCount.
*/
public void incrementFailedDeleteRequests(final int inc) {
source.incrementFailedDeleteRequests(inc);
}
-
+
+ /**
+ * @param inc How much to add to sucessfulScanCount.
+ */
+ public synchronized void incrementSucessfulScanRequests(final int inc) {
+ source.incrementSucessfulScanRequests(inc);
+ }
+
+ /**
+ * @param inc How much to add to failedScanCount.
+ */
+ public void incrementFailedScanRequests(final int inc) {
+ source.incrementFailedScanRequests(inc);
+ }
+
}
Added: hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/ProtobufStreamingUtil.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/ProtobufStreamingUtil.java?rev=1558995&view=auto
==============================================================================
--- hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/ProtobufStreamingUtil.java (added)
+++ hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/ProtobufStreamingUtil.java Fri Jan 17 02:33:16 2014
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.rest;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.List;
+
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.StreamingOutput;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.rest.model.CellModel;
+import org.apache.hadoop.hbase.rest.model.CellSetModel;
+import org.apache.hadoop.hbase.rest.model.RowModel;
+import org.apache.hadoop.hbase.util.Bytes;
+
+
+public class ProtobufStreamingUtil implements StreamingOutput {
+
+ private static final Log LOG = LogFactory.getLog(ProtobufStreamingUtil.class);
+ private String contentType;
+ private ResultScanner resultScanner;
+ private int limit;
+ private int fetchSize;
+
+ protected ProtobufStreamingUtil(ResultScanner scanner, String type, int limit, int fetchSize) {
+ this.resultScanner = scanner;
+ this.contentType = type;
+ this.limit = limit;
+ this.fetchSize = fetchSize;
+ LOG.debug("Created ScanStreamingUtil with content type = " + this.contentType + " user limit : "
+ + this.limit + " scan fetch size : " + this.fetchSize);
+ }
+
+ @Override
+ public void write(OutputStream outStream) throws IOException, WebApplicationException {
+ Result[] rowsToSend;
+ if(limit < fetchSize){
+ rowsToSend = this.resultScanner.next(limit);
+ writeToStream(createModelFromResults(rowsToSend), this.contentType, outStream);
+ } else {
+ int count = limit;
+ while (count > 0) {
+ if (count < fetchSize) {
+ rowsToSend = this.resultScanner.next(count);
+ } else {
+ rowsToSend = this.resultScanner.next(this.fetchSize);
+ }
+ if(rowsToSend.length == 0){
+ break;
+ }
+ count = count - rowsToSend.length;
+ writeToStream(createModelFromResults(rowsToSend), this.contentType, outStream);
+ }
+ }
+ }
+
+ private void writeToStream(CellSetModel model, String contentType, OutputStream outStream)
+ throws IOException {
+ byte[] objectBytes = model.createProtobufOutput();
+ outStream.write(Bytes.toBytes((short)objectBytes.length));
+ outStream.write(objectBytes);
+ outStream.flush();
+ LOG.trace("Wrote " + model.getRows().size() + " rows to stream successfully.");
+ }
+
+ private CellSetModel createModelFromResults(Result[] results) {
+ CellSetModel cellSetModel = new CellSetModel();
+ for (Result rs : results) {
+ byte[] rowKey = rs.getRow();
+ RowModel rModel = new RowModel(rowKey);
+ List<Cell> kvs = rs.listCells();
+ for (Cell kv : kvs) {
+ rModel.addCell(new CellModel(CellUtil.cloneFamily(kv), CellUtil.cloneQualifier(kv), kv
+ .getTimestamp(), CellUtil.cloneValue(kv)));
+ }
+ cellSetModel.addRow(rModel);
+ }
+ return cellSetModel;
+ }
+}
Modified: hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/TableResource.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/TableResource.java?rev=1558995&r1=1558994&r2=1558995&view=diff
==============================================================================
--- hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/TableResource.java (original)
+++ hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/TableResource.java Fri Jan 17 02:33:16 2014
@@ -20,18 +20,32 @@
package org.apache.hadoop.hbase.rest;
import java.io.IOException;
+import java.util.List;
+import javax.ws.rs.DefaultValue;
import javax.ws.rs.Encoded;
+import javax.ws.rs.HeaderParam;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.UriInfo;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.PrefixFilter;
+import org.apache.hadoop.hbase.util.Bytes;
@InterfaceAudience.Private
public class TableResource extends ResourceBase {
String table;
+ private static final Log LOG = LogFactory.getLog(TableResource.class);
/**
* Constructor
@@ -82,7 +96,7 @@ public class TableResource extends Resou
return new MultiRowResource(this, versions);
}
- @Path("{rowspec: .+}")
+ @Path("{rowspec: [^*]+}")
public RowResource getRowResource(
// We need the @Encoded decorator so Jersey won't urldecode before
// the RowSpec constructor has a chance to parse
@@ -91,4 +105,76 @@ public class TableResource extends Resou
final @QueryParam("check") String check) throws IOException {
return new RowResource(this, rowspec, versions, check);
}
+
+ @Path("{suffixglobbingspec: .*\\*/.+}")
+ public RowResource getRowResourceWithSuffixGlobbing(
+ // We need the @Encoded decorator so Jersey won't urldecode before
+ // the RowSpec constructor has a chance to parse
+ final @PathParam("suffixglobbingspec") @Encoded String suffixglobbingspec,
+ final @QueryParam("v") String versions,
+ final @QueryParam("check") String check) throws IOException {
+ return new RowResource(this, suffixglobbingspec, versions, check);
+ }
+
+ @Path("{scanspec: .*[*]$}")
+ public TableScanResource getScanResource(
+ final @Context UriInfo uriInfo,
+ final @PathParam("scanspec") String scanSpec,
+ final @HeaderParam("Accept") String contentType,
+ @DefaultValue(Integer.MAX_VALUE + "")
+ @QueryParam(Constants.SCAN_LIMIT) int userRequestedLimit,
+ @DefaultValue("") @QueryParam(Constants.SCAN_START_ROW) String startRow,
+ @DefaultValue("") @QueryParam(Constants.SCAN_END_ROW) String endRow,
+ @DefaultValue("") @QueryParam(Constants.SCAN_COLUMN) List<String> column,
+ @DefaultValue("1") @QueryParam(Constants.SCAN_MAX_VERSIONS) int maxVersions,
+ @DefaultValue("-1") @QueryParam(Constants.SCAN_BATCH_SIZE) int batchSize,
+ @DefaultValue("0") @QueryParam(Constants.SCAN_START_TIME) long startTime,
+ @DefaultValue(Long.MAX_VALUE + "") @QueryParam(Constants.SCAN_END_TIME) long endTime,
+ @DefaultValue("true") @QueryParam(Constants.SCAN_BATCH_SIZE) boolean cacheBlocks) {
+ try {
+ Filter filter = null;
+ if (scanSpec.indexOf('*') > 0) {
+ String prefix = scanSpec.substring(0, scanSpec.indexOf('*'));
+ filter = new PrefixFilter(Bytes.toBytes(prefix));
+ }
+ LOG.debug("Query parameters : Table Name = > " + this.table + " Start Row => " + startRow
+ + " End Row => " + endRow + " Columns => " + column + " Start Time => " + startTime
+ + " End Time => " + endTime + " Cache Blocks => " + cacheBlocks + " Max Versions => "
+ + maxVersions + " Batch Size => " + batchSize);
+ HTableInterface hTable = RESTServlet.getInstance().getTable(this.table);
+ Scan tableScan = new Scan();
+ tableScan.setBatch(batchSize);
+ tableScan.setMaxVersions(maxVersions);
+ tableScan.setTimeRange(startTime, endTime);
+ tableScan.setStartRow(Bytes.toBytes(startRow));
+ tableScan.setStopRow(Bytes.toBytes(endRow));
+ for (String csplit : column) {
+ String[] familysplit = csplit.trim().split(":");
+ if (familysplit.length == 2) {
+ if (familysplit[1].length() > 0) {
+ LOG.debug("Scan family and column : " + familysplit[0] + " " + familysplit[1]);
+ tableScan.addColumn(Bytes.toBytes(familysplit[0]), Bytes.toBytes(familysplit[1]));
+ } else {
+ tableScan.addFamily(Bytes.toBytes(familysplit[0]));
+ LOG.debug("Scan family : " + familysplit[0] + " and empty qualifier.");
+ tableScan.addColumn(Bytes.toBytes(familysplit[0]), null);
+ }
+ } else if (StringUtils.isNotEmpty(familysplit[0])){
+ LOG.debug("Scan family : " + familysplit[0]);
+ tableScan.addFamily(Bytes.toBytes(familysplit[0]));
+ }
+ }
+ if (filter != null) {
+ tableScan.setFilter(filter);
+ }
+ int fetchSize = this.servlet.getConfiguration().getInt(Constants.SCAN_FETCH_SIZE, 10);
+ tableScan.setCaching(fetchSize);
+ return new TableScanResource(hTable.getScanner(tableScan), userRequestedLimit);
+ } catch (Exception exp) {
+ servlet.getMetrics().incrementFailedScanRequests(1);
+ processException(exp);
+ LOG.warn(exp);
+ return null;
+ }
+ }
}
Added: hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/TableScanResource.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/TableScanResource.java?rev=1558995&view=auto
==============================================================================
--- hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/TableScanResource.java (added)
+++ hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/TableScanResource.java Fri Jan 17 02:33:16 2014
@@ -0,0 +1,168 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.rest;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import javax.ws.rs.DefaultValue;
+import javax.ws.rs.GET;
+import javax.ws.rs.HeaderParam;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.ResponseBuilder;
+import javax.ws.rs.core.UriInfo;
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlRootElement;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.rest.model.CellModel;
+import org.apache.hadoop.hbase.rest.model.RowModel;
+import org.codehaus.jackson.annotate.JsonIgnore;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+@InterfaceAudience.Private
+public class TableScanResource extends ResourceBase {
+
+ private static final Log LOG = LogFactory.getLog(TableScanResource.class);
+ TableResource tableResource;
+ ResultScanner results;
+ int userRequestedLimit;
+
+ public TableScanResource(ResultScanner scanner, int userRequestedLimit) throws IOException {
+ super();
+ this.results = scanner;
+ this.userRequestedLimit = userRequestedLimit;
+ }
+
+ @GET
+ @Produces({ Constants.MIMETYPE_XML, Constants.MIMETYPE_JSON })
+ public CellSetModelStream get(final @Context UriInfo uriInfo) {
+ servlet.getMetrics().incrementRequests(1);
+ final int rowsToSend = userRequestedLimit;
+ servlet.getMetrics().incrementSucessfulScanRequests(1);
+ final Iterator<Result> itr = results.iterator();
+ return new CellSetModelStream(new ArrayList<RowModel>() {
+ public Iterator<RowModel> iterator() {
+ return new Iterator<RowModel>() {
+ int count = rowsToSend;
+
+ @Override
+ public boolean hasNext() {
+ if (count > 0) {
+ return itr.hasNext();
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException(
+ "Remove method cannot be used in CellSetModelStream");
+ }
+
+ @Override
+ public RowModel next() {
+ Result rs = itr.next();
+ if ((rs == null) || (count <= 0)) {
+ return null;
+ }
+ byte[] rowKey = rs.getRow();
+ RowModel rModel = new RowModel(rowKey);
+ List<Cell> kvs = rs.listCells();
+ for (Cell kv : kvs) {
+ rModel.addCell(new CellModel(CellUtil.cloneFamily(kv), CellUtil.cloneQualifier(kv),
+ kv.getTimestamp(), CellUtil.cloneValue(kv)));
+ }
+ count--;
+ return rModel;
+ }
+ };
+ }
+ });
+ }
+
+ @GET
+ @Produces({ Constants.MIMETYPE_PROTOBUF, Constants.MIMETYPE_PROTOBUF_IETF })
+ public Response getProtobuf(
+ final @Context UriInfo uriInfo,
+ final @PathParam("scanspec") String scanSpec,
+ final @HeaderParam("Accept") String contentType,
+ @DefaultValue(Integer.MAX_VALUE + "") @QueryParam(Constants.SCAN_LIMIT) int userRequestedLimit,
+ @DefaultValue("") @QueryParam(Constants.SCAN_START_ROW) String startRow,
+ @DefaultValue("") @QueryParam(Constants.SCAN_END_ROW) String endRow,
+ @DefaultValue("column") @QueryParam(Constants.SCAN_COLUMN) List<String> column,
+ @DefaultValue("1") @QueryParam(Constants.SCAN_MAX_VERSIONS) int maxVersions,
+ @DefaultValue("-1") @QueryParam(Constants.SCAN_BATCH_SIZE) int batchSize,
+ @DefaultValue("0") @QueryParam(Constants.SCAN_START_TIME) long startTime,
+ @DefaultValue(Long.MAX_VALUE + "") @QueryParam(Constants.SCAN_END_TIME) long endTime,
+ @DefaultValue("true") @QueryParam(Constants.SCAN_BATCH_SIZE) boolean cacheBlocks) {
+ servlet.getMetrics().incrementRequests(1);
+ try {
+ int fetchSize = this.servlet.getConfiguration().getInt(Constants.SCAN_FETCH_SIZE, 10);
+ ProtobufStreamingUtil stream = new ProtobufStreamingUtil(this.results, contentType,
+ userRequestedLimit, fetchSize);
+ servlet.getMetrics().incrementSucessfulScanRequests(1);
+ ResponseBuilder response = Response.ok(stream);
+ response.header("content-type", contentType);
+ return response.build();
+ } catch (Exception exp) {
+ servlet.getMetrics().incrementFailedScanRequests(1);
+ processException(exp);
+ LOG.warn(exp);
+ return null;
+ }
+ }
+
+ @XmlRootElement(name = "CellSet")
+ @XmlAccessorType(XmlAccessType.FIELD)
+ public static class CellSetModelStream {
+ // JAXB needs an arraylist for streaming
+ @XmlElement(name = "Row")
+ @JsonIgnore
+ private ArrayList<RowModel> Row;
+
+ public CellSetModelStream() {
+ }
+
+ public CellSetModelStream(final ArrayList<RowModel> rowList) {
+ this.Row = rowList;
+ }
+
+ // jackson needs an iterator for streaming
+ @JsonProperty("Row")
+ public Iterator<RowModel> getIterator() {
+ return Row.iterator();
+ }
+ }
+}
Modified: hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/client/Client.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/client/Client.java?rev=1558995&r1=1558994&r2=1558995&view=diff
==============================================================================
--- hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/client/Client.java (original)
+++ hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/client/Client.java Fri Jan 17 02:33:16 2014
@@ -20,6 +20,7 @@
package org.apache.hadoop.hbase.rest.client;
import java.io.IOException;
+import java.io.InputStream;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -333,7 +334,8 @@ public class Client {
int code = execute(c, method, headers, path);
headers = method.getResponseHeaders();
byte[] body = method.getResponseBody();
- return new Response(code, headers, body);
+ InputStream in = method.getResponseBodyAsStream();
+ return new Response(code, headers, body, in);
} finally {
method.releaseConnection();
}
Modified: hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/client/Response.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/client/Response.java?rev=1558995&r1=1558994&r2=1558995&view=diff
==============================================================================
--- hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/client/Response.java (original)
+++ hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/client/Response.java Fri Jan 17 02:33:16 2014
@@ -19,6 +19,8 @@
package org.apache.hadoop.hbase.rest.client;
+import java.io.InputStream;
+
import org.apache.commons.httpclient.Header;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@@ -32,6 +34,7 @@ public class Response {
private int code;
private Header[] headers;
private byte[] body;
+ private InputStream stream;
/**
* Constructor
@@ -61,6 +64,20 @@ public class Response {
this.headers = headers;
this.body = body;
}
+
+ /**
+ * Constructor
+ * @param code the HTTP response code
+ * @param headers headers the HTTP response headers
+ * @param body the response body, can be null
+ * @param in Inputstream if the response had one.
+ */
+ public Response(int code, Header[] headers, byte[] body, InputStream in) {
+ this.code = code;
+ this.headers = headers;
+ this.body = body;
+ this.stream = in;
+ }
/**
* @return the HTTP response code
@@ -68,6 +85,15 @@ public class Response {
public int getCode() {
return code;
}
+
+ /**
+ * Gets the input stream instance.
+ *
+ * @return an instance of InputStream class.
+ */
+ public InputStream getStream(){
+ return this.stream;
+ }
/**
* @return the HTTP response headers
Modified: hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/rest/TestGetAndPutResource.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/rest/TestGetAndPutResource.java?rev=1558995&r1=1558994&r2=1558995&view=diff
==============================================================================
--- hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/rest/TestGetAndPutResource.java (original)
+++ hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/rest/TestGetAndPutResource.java Fri Jan 17 02:33:16 2014
@@ -482,5 +482,103 @@ public class TestGetAndPutResource exten
}
return contains;
}
+
+ @Test
+ public void testSuffixGlobbingXMLWithNewScanner() throws IOException, JAXBException {
+ String path = "/" + TABLE + "/fakerow"; // deliberate nonexistent row
+
+ CellSetModel cellSetModel = new CellSetModel();
+ RowModel rowModel = new RowModel(ROW_1);
+ rowModel.addCell(new CellModel(Bytes.toBytes(COLUMN_1),
+ Bytes.toBytes(VALUE_1)));
+ rowModel.addCell(new CellModel(Bytes.toBytes(COLUMN_2),
+ Bytes.toBytes(VALUE_2)));
+ cellSetModel.addRow(rowModel);
+ rowModel = new RowModel(ROW_2);
+ rowModel.addCell(new CellModel(Bytes.toBytes(COLUMN_1),
+ Bytes.toBytes(VALUE_3)));
+ rowModel.addCell(new CellModel(Bytes.toBytes(COLUMN_2),
+ Bytes.toBytes(VALUE_4)));
+ cellSetModel.addRow(rowModel);
+ StringWriter writer = new StringWriter();
+ xmlMarshaller.marshal(cellSetModel, writer);
+ Response response = client.put(path, Constants.MIMETYPE_XML,
+ Bytes.toBytes(writer.toString()));
+ Thread.yield();
+
+ // make sure the fake row was not actually created
+ response = client.get(path, Constants.MIMETYPE_XML);
+ assertEquals(response.getCode(), 404);
+
+ // check that all of the values were created
+ StringBuilder query = new StringBuilder();
+ query.append('/');
+ query.append(TABLE);
+ query.append('/');
+ query.append("testrow*");
+ response = client.get(query.toString(), Constants.MIMETYPE_XML);
+ assertEquals(response.getCode(), 200);
+ assertEquals(Constants.MIMETYPE_XML, response.getHeader("content-type"));
+ CellSetModel cellSet = (CellSetModel)
+ xmlUnmarshaller.unmarshal(new ByteArrayInputStream(response.getBody()));
+ assertTrue(cellSet.getRows().size() == 2);
+
+ response = deleteRow(TABLE, ROW_1);
+ assertEquals(response.getCode(), 200);
+ response = deleteRow(TABLE, ROW_2);
+ assertEquals(response.getCode(), 200);
+ }
+
+ @Test
+ public void testSuffixGlobbingXML() throws IOException, JAXBException {
+ String path = "/" + TABLE + "/fakerow"; // deliberate nonexistent row
+
+ CellSetModel cellSetModel = new CellSetModel();
+ RowModel rowModel = new RowModel(ROW_1);
+ rowModel.addCell(new CellModel(Bytes.toBytes(COLUMN_1),
+ Bytes.toBytes(VALUE_1)));
+ rowModel.addCell(new CellModel(Bytes.toBytes(COLUMN_2),
+ Bytes.toBytes(VALUE_2)));
+ cellSetModel.addRow(rowModel);
+ rowModel = new RowModel(ROW_2);
+ rowModel.addCell(new CellModel(Bytes.toBytes(COLUMN_1),
+ Bytes.toBytes(VALUE_3)));
+ rowModel.addCell(new CellModel(Bytes.toBytes(COLUMN_2),
+ Bytes.toBytes(VALUE_4)));
+ cellSetModel.addRow(rowModel);
+ StringWriter writer = new StringWriter();
+ xmlMarshaller.marshal(cellSetModel, writer);
+ Response response = client.put(path, Constants.MIMETYPE_XML,
+ Bytes.toBytes(writer.toString()));
+ Thread.yield();
+
+ // make sure the fake row was not actually created
+ response = client.get(path, Constants.MIMETYPE_XML);
+ assertEquals(response.getCode(), 404);
+
+ // check that all of the values were created
+ StringBuilder query = new StringBuilder();
+ query.append('/');
+ query.append(TABLE);
+ query.append('/');
+ query.append("testrow*");
+ query.append('/');
+ query.append(COLUMN_1);
+ response = client.get(query.toString(), Constants.MIMETYPE_XML);
+ assertEquals(response.getCode(), 200);
+ assertEquals(Constants.MIMETYPE_XML, response.getHeader("content-type"));
+ CellSetModel cellSet = (CellSetModel)
+ xmlUnmarshaller.unmarshal(new ByteArrayInputStream(response.getBody()));
+ List<RowModel> rows = cellSet.getRows();
+ assertTrue(rows.size() == 2);
+ for (RowModel row : rows) {
+ assertTrue(row.getCells().size() == 1);
+ assertEquals(COLUMN_1, Bytes.toString(row.getCells().get(0).getColumn()));
+ }
+ response = deleteRow(TABLE, ROW_1);
+ assertEquals(response.getCode(), 200);
+ response = deleteRow(TABLE, ROW_2);
+ assertEquals(response.getCode(), 200);
+ }
}
Modified: hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/rest/TestScannerResource.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/rest/TestScannerResource.java?rev=1558995&r1=1558994&r2=1558995&view=diff
==============================================================================
--- hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/rest/TestScannerResource.java (original)
+++ hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/rest/TestScannerResource.java Fri Jan 17 02:33:16 2014
@@ -73,11 +73,11 @@ public class TestScannerResource {
private static int expectedRows2;
private static Configuration conf;
- private static int insertData(String tableName, String column, double prob)
+ static int insertData(Configuration conf, String tableName, String column, double prob)
throws IOException {
Random rng = new Random();
int count = 0;
- HTable table = new HTable(TEST_UTIL.getConfiguration(), tableName);
+ HTable table = new HTable(conf, tableName);
byte[] k = new byte[3];
byte [][] famAndQf = KeyValue.parseColumn(Bytes.toBytes(column));
for (byte b1 = 'a'; b1 < 'z'; b1++) {
@@ -97,10 +97,11 @@ public class TestScannerResource {
}
}
table.flushCommits();
+ table.close();
return count;
}
- private static int countCellSet(CellSetModel model) {
+ static int countCellSet(CellSetModel model) {
int count = 0;
Iterator<RowModel> rows = model.getRows().iterator();
while (rows.hasNext()) {
@@ -170,8 +171,8 @@ public class TestScannerResource {
htd.addFamily(new HColumnDescriptor(CFA));
htd.addFamily(new HColumnDescriptor(CFB));
admin.createTable(htd);
- expectedRows1 = insertData(TABLE, COLUMN_1, 1.0);
- expectedRows2 = insertData(TABLE, COLUMN_2, 0.5);
+ expectedRows1 = insertData(TEST_UTIL.getConfiguration(), TABLE, COLUMN_1, 1.0);
+ expectedRows2 = insertData(TEST_UTIL.getConfiguration(), TABLE, COLUMN_2, 0.5);
}
@AfterClass
Added: hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/rest/TestTableScan.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/rest/TestTableScan.java?rev=1558995&view=auto
==============================================================================
--- hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/rest/TestTableScan.java (added)
+++ hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/rest/TestTableScan.java Fri Jan 17 02:33:16 2014
@@ -0,0 +1,508 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.rest;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.ws.rs.core.MediaType;
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Unmarshaller;
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlRootElement;
+import javax.xml.parsers.SAXParserFactory;
+import javax.xml.stream.XMLStreamException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.rest.client.Client;
+import org.apache.hadoop.hbase.rest.client.Cluster;
+import org.apache.hadoop.hbase.rest.client.Response;
+import org.apache.hadoop.hbase.rest.model.CellModel;
+import org.apache.hadoop.hbase.rest.model.CellSetModel;
+import org.apache.hadoop.hbase.rest.model.RowModel;
+import org.apache.hadoop.hbase.rest.provider.JacksonProvider;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.JsonParser;
+import org.codehaus.jackson.JsonToken;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.xml.sax.InputSource;
+import org.xml.sax.XMLReader;
+
+@Category(MediumTests.class)
+public class TestTableScan {
+
+ private static final String TABLE = "TestScanResource";
+ private static final String CFA = "a";
+ private static final String CFB = "b";
+ private static final String COLUMN_1 = CFA + ":1";
+ private static final String COLUMN_2 = CFB + ":2";
+ private static Client client;
+ private static int expectedRows1;
+ private static int expectedRows2;
+ private static Configuration conf;
+
+ private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+ private static final HBaseRESTTestingUtility REST_TEST_UTIL =
+ new HBaseRESTTestingUtility();
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ conf = TEST_UTIL.getConfiguration();
+ TEST_UTIL.startMiniCluster();
+ REST_TEST_UTIL.startServletContainer(conf);
+ client = new Client(new Cluster().add("localhost",
+ REST_TEST_UTIL.getServletPort()));
+ HBaseAdmin admin = TEST_UTIL.getHBaseAdmin();
+ if (!admin.tableExists(TABLE)) {
+ HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(TABLE));
+ htd.addFamily(new HColumnDescriptor(CFA));
+ htd.addFamily(new HColumnDescriptor(CFB));
+ admin.createTable(htd);
+ expectedRows1 = TestScannerResource.insertData(conf, TABLE, COLUMN_1, 1.0);
+ expectedRows2 = TestScannerResource.insertData(conf, TABLE, COLUMN_2, 0.5);
+ }
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ TEST_UTIL.getHBaseAdmin().disableTable(TABLE);
+ TEST_UTIL.getHBaseAdmin().deleteTable(TABLE);
+ REST_TEST_UTIL.shutdownServletContainer();
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ @Test
+ public void testSimpleScannerXML() throws IOException, JAXBException, XMLStreamException {
+ // Test scanning particular columns
+ StringBuilder builder = new StringBuilder();
+ builder.append("/*");
+ builder.append("?");
+ builder.append(Constants.SCAN_COLUMN + "=" + COLUMN_1);
+ builder.append("&");
+ builder.append(Constants.SCAN_LIMIT + "=10");
+ Response response = client.get("/" + TABLE + builder.toString(),
+ Constants.MIMETYPE_XML);
+ assertEquals(200, response.getCode());
+ assertEquals(Constants.MIMETYPE_XML, response.getHeader("content-type"));
+ JAXBContext ctx = JAXBContext.newInstance(CellSetModel.class);
+ Unmarshaller ush = ctx.createUnmarshaller();
+ CellSetModel model = (CellSetModel) ush.unmarshal(response.getStream());
+ int count = TestScannerResource.countCellSet(model);
+ assertEquals(10, count);
+ checkRowsNotNull(model);
+
+ //Test with no limit.
+ builder = new StringBuilder();
+ builder.append("/*");
+ builder.append("?");
+ builder.append(Constants.SCAN_COLUMN + "=" + COLUMN_1);
+ response = client.get("/" + TABLE + builder.toString(),
+ Constants.MIMETYPE_XML);
+ assertEquals(200, response.getCode());
+ assertEquals(Constants.MIMETYPE_XML, response.getHeader("content-type"));
+ model = (CellSetModel) ush.unmarshal(response.getStream());
+ count = TestScannerResource.countCellSet(model);
+ assertEquals(expectedRows1, count);
+ checkRowsNotNull(model);
+
+ //Test with start and end row.
+ builder = new StringBuilder();
+ builder.append("/*");
+ builder.append("?");
+ builder.append(Constants.SCAN_COLUMN + "=" + COLUMN_1);
+ builder.append("&");
+ builder.append(Constants.SCAN_START_ROW + "=aaa");
+ builder.append("&");
+ builder.append(Constants.SCAN_END_ROW + "=aay");
+ response = client.get("/" + TABLE + builder.toString(),
+ Constants.MIMETYPE_XML);
+ assertEquals(200, response.getCode());
+ model = (CellSetModel) ush.unmarshal(response.getStream());
+ count = TestScannerResource.countCellSet(model);
+ RowModel startRow = model.getRows().get(0);
+ assertEquals("aaa", Bytes.toString(startRow.getKey()));
+ RowModel endRow = model.getRows().get(model.getRows().size() - 1);
+ assertEquals("aax", Bytes.toString(endRow.getKey()));
+ assertEquals(24, count);
+ checkRowsNotNull(model);
+
+ //Test with start row and limit.
+ builder = new StringBuilder();
+ builder.append("/*");
+ builder.append("?");
+ builder.append(Constants.SCAN_COLUMN + "=" + COLUMN_1);
+ builder.append("&");
+ builder.append(Constants.SCAN_START_ROW + "=aaa");
+ builder.append("&");
+ builder.append(Constants.SCAN_LIMIT + "=15");
+ response = client.get("/" + TABLE + builder.toString(),
+ Constants.MIMETYPE_XML);
+ assertEquals(200, response.getCode());
+ assertEquals(Constants.MIMETYPE_XML, response.getHeader("content-type"));
+ model = (CellSetModel) ush.unmarshal(response.getStream());
+ startRow = model.getRows().get(0);
+ assertEquals("aaa", Bytes.toString(startRow.getKey()));
+ count = TestScannerResource.countCellSet(model);
+ assertEquals(15, count);
+ checkRowsNotNull(model);
+
+ }
+
+ @Test
+ public void testSimpleScannerJson() throws IOException, JAXBException {
+ // Test scanning particular columns with limit.
+ StringBuilder builder = new StringBuilder();
+ builder.append("/*");
+ builder.append("?");
+ builder.append(Constants.SCAN_COLUMN + "=" + COLUMN_1);
+ builder.append("&");
+ builder.append(Constants.SCAN_LIMIT + "=20");
+ Response response = client.get("/" + TABLE + builder.toString(),
+ Constants.MIMETYPE_JSON);
+ assertEquals(200, response.getCode());
+ assertEquals(Constants.MIMETYPE_JSON, response.getHeader("content-type"));
+ ObjectMapper mapper = new JacksonProvider()
+ .locateMapper(CellSetModel.class, MediaType.APPLICATION_JSON_TYPE);
+ CellSetModel model = mapper.readValue(response.getStream(), CellSetModel.class);
+ int count = TestScannerResource.countCellSet(model);
+ assertEquals(20, count);
+ checkRowsNotNull(model);
+
+ //Test scanning with no limit.
+ builder = new StringBuilder();
+ builder.append("/*");
+ builder.append("?");
+ builder.append(Constants.SCAN_COLUMN + "=" + COLUMN_2);
+ response = client.get("/" + TABLE + builder.toString(),
+ Constants.MIMETYPE_JSON);
+ assertEquals(200, response.getCode());
+ assertEquals(Constants.MIMETYPE_JSON, response.getHeader("content-type"));
+ model = mapper.readValue(response.getStream(), CellSetModel.class);
+ count = TestScannerResource.countCellSet(model);
+ assertEquals(expectedRows2, count);
+ checkRowsNotNull(model);
+
+ //Test with start row and end row.
+ builder = new StringBuilder();
+ builder.append("/*");
+ builder.append("?");
+ builder.append(Constants.SCAN_COLUMN + "=" + COLUMN_1);
+ builder.append("&");
+ builder.append(Constants.SCAN_START_ROW + "=aaa");
+ builder.append("&");
+ builder.append(Constants.SCAN_END_ROW + "=aay");
+ response = client.get("/" + TABLE + builder.toString(),
+ Constants.MIMETYPE_JSON);
+ assertEquals(200, response.getCode());
+ model = mapper.readValue(response.getStream(), CellSetModel.class);
+ RowModel startRow = model.getRows().get(0);
+ assertEquals("aaa", Bytes.toString(startRow.getKey()));
+ RowModel endRow = model.getRows().get(model.getRows().size() - 1);
+ assertEquals("aax", Bytes.toString(endRow.getKey()));
+ count = TestScannerResource.countCellSet(model);
+ assertEquals(24, count);
+ checkRowsNotNull(model);
+ }
+
+ /**
+ * An example to scan using listener in unmarshaller for XML.
+ * @throws Exception the exception
+ */
+ @Test
+ public void testScanUsingListenerUnmarshallerXML() throws Exception {
+ StringBuilder builder = new StringBuilder();
+ builder.append("/*");
+ builder.append("?");
+ builder.append(Constants.SCAN_COLUMN + "=" + COLUMN_1);
+ builder.append("&");
+ builder.append(Constants.SCAN_LIMIT + "=10");
+ Response response = client.get("/" + TABLE + builder.toString(),
+ Constants.MIMETYPE_XML);
+ assertEquals(200, response.getCode());
+ assertEquals(Constants.MIMETYPE_XML, response.getHeader("content-type"));
+ JAXBContext context = JAXBContext.newInstance(ClientSideCellSetModel.class, RowModel.class,
+ CellModel.class);
+ Unmarshaller unmarshaller = context.createUnmarshaller();
+
+ final ClientSideCellSetModel.Listener listener = new ClientSideCellSetModel.Listener() {
+ @Override
+ public void handleRowModel(ClientSideCellSetModel helper, RowModel row) {
+ assertTrue(row.getKey() != null);
+ assertTrue(row.getCells().size() > 0);
+ }
+ };
+
+ // install the callback on all ClientSideCellSetModel instances
+ unmarshaller.setListener(new Unmarshaller.Listener() {
+ public void beforeUnmarshal(Object target, Object parent) {
+ if (target instanceof ClientSideCellSetModel) {
+ ((ClientSideCellSetModel) target).setCellSetModelListener(listener);
+ }
+ }
+
+ public void afterUnmarshal(Object target, Object parent) {
+ if (target instanceof ClientSideCellSetModel) {
+ ((ClientSideCellSetModel) target).setCellSetModelListener(null);
+ }
+ }
+ });
+
+ // create a new XML parser
+ SAXParserFactory factory = SAXParserFactory.newInstance();
+ factory.setNamespaceAware(true);
+ XMLReader reader = factory.newSAXParser().getXMLReader();
+ reader.setContentHandler(unmarshaller.getUnmarshallerHandler());
+ assertFalse(ClientSideCellSetModel.listenerInvoked);
+ reader.parse(new InputSource(response.getStream()));
+ assertTrue(ClientSideCellSetModel.listenerInvoked);
+
+ }
+
+ @Test
+ public void testStreamingJSON() throws Exception {
+ // Test scanning particular columns with limit.
+ StringBuilder builder = new StringBuilder();
+ builder.append("/*");
+ builder.append("?");
+ builder.append(Constants.SCAN_COLUMN + "=" + COLUMN_1);
+ builder.append("&");
+ builder.append(Constants.SCAN_LIMIT + "=20");
+ Response response = client.get("/" + TABLE + builder.toString(),
+ Constants.MIMETYPE_JSON);
+ assertEquals(200, response.getCode());
+ assertEquals(Constants.MIMETYPE_JSON, response.getHeader("content-type"));
+ ObjectMapper mapper = new JacksonProvider()
+ .locateMapper(CellSetModel.class, MediaType.APPLICATION_JSON_TYPE);
+ CellSetModel model = mapper.readValue(response.getStream(), CellSetModel.class);
+ int count = TestScannerResource.countCellSet(model);
+ assertEquals(20, count);
+ checkRowsNotNull(model);
+
+ //Test scanning with no limit.
+ builder = new StringBuilder();
+ builder.append("/*");
+ builder.append("?");
+ builder.append(Constants.SCAN_COLUMN + "=" + COLUMN_2);
+ response = client.get("/" + TABLE + builder.toString(),
+ Constants.MIMETYPE_JSON);
+ assertEquals(200, response.getCode());
+ assertEquals(Constants.MIMETYPE_JSON, response.getHeader("content-type"));
+ model = mapper.readValue(response.getStream(), CellSetModel.class);
+ count = TestScannerResource.countCellSet(model);
+ assertEquals(expectedRows2, count);
+ checkRowsNotNull(model);
+
+ //Test with start row and end row.
+ builder = new StringBuilder();
+ builder.append("/*");
+ builder.append("?");
+ builder.append(Constants.SCAN_COLUMN + "=" + COLUMN_1);
+ builder.append("&");
+ builder.append(Constants.SCAN_START_ROW + "=aaa");
+ builder.append("&");
+ builder.append(Constants.SCAN_END_ROW + "=aay");
+ response = client.get("/" + TABLE + builder.toString(),
+ Constants.MIMETYPE_JSON);
+ assertEquals(200, response.getCode());
+
+ count = 0;
+ JsonFactory jfactory = new JsonFactory(mapper);
+ JsonParser jParser = jfactory.createJsonParser(response.getStream());
+ boolean found = false;
+ while (jParser.nextToken() != JsonToken.END_OBJECT) {
+ if(jParser.getCurrentToken() == JsonToken.START_OBJECT && found) {
+ RowModel row = jParser.readValueAs(RowModel.class);
+ assertNotNull(row.getKey());
+ for (int i = 0; i < row.getCells().size(); i++) {
+ if (count == 0) {
+ assertEquals("aaa", Bytes.toString(row.getKey()));
+ }
+ if (count == 23) {
+ assertEquals("aax", Bytes.toString(row.getKey()));
+ }
+ count++;
+ }
+ jParser.skipChildren();
+ } else {
+ found = jParser.getCurrentToken() == JsonToken.START_ARRAY;
+ }
+ }
+ assertEquals(24, count);
+ }
+
+ @Test
+ public void testSimpleScannerProtobuf() throws Exception {
+ StringBuilder builder = new StringBuilder();
+ builder.append("/*");
+ builder.append("?");
+ builder.append(Constants.SCAN_COLUMN + "=" + COLUMN_1);
+ builder.append("&");
+ builder.append(Constants.SCAN_LIMIT + "=15");
+ Response response = client.get("/" + TABLE + builder.toString(),
+ Constants.MIMETYPE_PROTOBUF);
+ assertEquals(200, response.getCode());
+ assertEquals(Constants.MIMETYPE_PROTOBUF, response.getHeader("content-type"));
+ int rowCount = readProtobufStream(response.getStream());
+ assertEquals(15, rowCount);
+
+ //Test with start row and end row.
+ builder = new StringBuilder();
+ builder.append("/*");
+ builder.append("?");
+ builder.append(Constants.SCAN_COLUMN + "=" + COLUMN_1);
+ builder.append("&");
+ builder.append(Constants.SCAN_START_ROW + "=aaa");
+ builder.append("&");
+ builder.append(Constants.SCAN_END_ROW + "=aay");
+ response = client.get("/" + TABLE + builder.toString(),
+ Constants.MIMETYPE_PROTOBUF);
+ assertEquals(200, response.getCode());
+ assertEquals(Constants.MIMETYPE_PROTOBUF, response.getHeader("content-type"));
+ rowCount = readProtobufStream(response.getStream());
+ assertEquals(24, rowCount);
+ }
+
+ private void checkRowsNotNull(CellSetModel model) {
+ for (RowModel row: model.getRows()) {
+ assertTrue(row.getKey() != null);
+ assertTrue(row.getCells().size() > 0);
+ }
+ }
+
+ /**
+ * Read protobuf stream.
+ * @param inputStream the input stream
+ * @return The number of rows in the cell set model.
+ * @throws IOException Signals that an I/O exception has occurred.
+ */
+ public int readProtobufStream(InputStream inputStream) throws IOException{
+ DataInputStream stream = new DataInputStream(inputStream);
+ CellSetModel model = null;
+ int rowCount = 0;
+ try {
+ while (true) {
+ byte[] lengthBytes = new byte[2];
+ int readBytes = stream.read(lengthBytes);
+ if (readBytes == -1) {
+ break;
+ }
+ assertEquals(2, readBytes);
+ int length = Bytes.toShort(lengthBytes);
+ byte[] cellset = new byte[length];
+ stream.read(cellset);
+ model = new CellSetModel();
+ model.getObjectFromMessage(cellset);
+ checkRowsNotNull(model);
+ rowCount = rowCount + TestScannerResource.countCellSet(model);
+ }
+ } catch (EOFException exp) {
+ exp.printStackTrace();
+ } finally {
+ stream.close();
+ }
+ return rowCount;
+ }
+
+ @Test
+ public void testScanningUnknownColumnJson() throws IOException, JAXBException {
+ // Test scanning particular columns with limit.
+ StringBuilder builder = new StringBuilder();
+ builder.append("/*");
+ builder.append("?");
+ builder.append(Constants.SCAN_COLUMN + "=a:test");
+ Response response = client.get("/" + TABLE + builder.toString(),
+ Constants.MIMETYPE_JSON);
+ assertEquals(200, response.getCode());
+ assertEquals(Constants.MIMETYPE_JSON, response.getHeader("content-type"));
+ ObjectMapper mapper = new JacksonProvider().locateMapper(CellSetModel.class,
+ MediaType.APPLICATION_JSON_TYPE);
+ CellSetModel model = mapper.readValue(response.getStream(), CellSetModel.class);
+ int count = TestScannerResource.countCellSet(model);
+ assertEquals(0, count);
+ }
+
+ /**
+ * The Class ClientSideCellSetModel which mimics cell set model, and contains listener to perform
+ * user defined operations on the row model.
+ */
+ @XmlRootElement(name = "CellSet")
+ @XmlAccessorType(XmlAccessType.FIELD)
+ public static class ClientSideCellSetModel implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * This list is not a real list; instead it will notify a listener whenever JAXB has
+ * unmarshalled the next row.
+ */
+ @XmlElement(name="Row")
+ private List<RowModel> row;
+
+ static boolean listenerInvoked = false;
+
+ /**
+ * Install a listener for row model on this object. If l is null, the listener
+ * is removed again.
+ */
+ public void setCellSetModelListener(final Listener l) {
+ row = (l == null) ? null : new ArrayList<RowModel>() {
+ private static final long serialVersionUID = 1L;
+
+ public boolean add(RowModel o) {
+ l.handleRowModel(ClientSideCellSetModel.this, o);
+ listenerInvoked = true;
+ return false;
+ }
+ };
+ }
+
+ /**
+ * This listener is invoked every time a new row model is unmarshalled.
+ */
+ public static interface Listener {
+ void handleRowModel(ClientSideCellSetModel helper, RowModel rowModel);
+ }
+ }
+}
+
+
+