You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by nd...@apache.org on 2014/01/17 03:33:16 UTC

svn commit: r1558995 - in /hbase/branches/0.98: hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/rest/ hbase-hadoop1-compat/src/main/java/org/apache/hadoop/hbase/rest/ hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/rest/ hbase-serv...

Author: ndimiduk
Date: Fri Jan 17 02:33:16 2014
New Revision: 1558995

URL: http://svn.apache.org/r1558995
Log:
HBASE-9343 Implement stateless scanner for Stargate (Vandana Ayyalasomayajula)

Added:
    hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/ProtobufStreamingUtil.java
    hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/TableScanResource.java
    hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/rest/TestTableScan.java
Modified:
    hbase/branches/0.98/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/rest/MetricsRESTSource.java
    hbase/branches/0.98/hbase-hadoop1-compat/src/main/java/org/apache/hadoop/hbase/rest/MetricsRESTSourceImpl.java
    hbase/branches/0.98/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/rest/MetricsRESTSourceImpl.java
    hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/Constants.java
    hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/MetricsREST.java
    hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/TableResource.java
    hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/client/Client.java
    hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/client/Response.java
    hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/rest/TestGetAndPutResource.java
    hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/rest/TestScannerResource.java

Modified: hbase/branches/0.98/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/rest/MetricsRESTSource.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/rest/MetricsRESTSource.java?rev=1558995&r1=1558994&r2=1558995&view=diff
==============================================================================
--- hbase/branches/0.98/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/rest/MetricsRESTSource.java (original)
+++ hbase/branches/0.98/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/rest/MetricsRESTSource.java Fri Jan 17 02:33:16 2014
@@ -46,6 +46,10 @@ public interface MetricsRESTSource exten
   String FAILED_PUT_KEY = "failedPut";
 
   String FAILED_DELETE_KEY = "failedDelete";
+  
+  String SUCCESSFUL_SCAN_KEY = "successfulScanCount";
+  
+  String FAILED_SCAN_KEY = "failedScanCount";
 
   /**
    * Increment the number of requests
@@ -95,4 +99,18 @@ public interface MetricsRESTSource exten
    * @param inc The number of failed delete requests.
    */
   void incrementFailedDeleteRequests(int inc);
+  
+  /**
+   * Increment the number of successful scan requests.
+   *
+   * @param inc Number of successful scan requests.
+   */
+  void incrementSucessfulScanRequests(final int inc);
+  
+  /**
+   * Increment the number failed scan requests.
+   *
+   * @param inc the inc
+   */
+  void incrementFailedScanRequests(final int inc);
 }

Modified: hbase/branches/0.98/hbase-hadoop1-compat/src/main/java/org/apache/hadoop/hbase/rest/MetricsRESTSourceImpl.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-hadoop1-compat/src/main/java/org/apache/hadoop/hbase/rest/MetricsRESTSourceImpl.java?rev=1558995&r1=1558994&r2=1558995&view=diff
==============================================================================
--- hbase/branches/0.98/hbase-hadoop1-compat/src/main/java/org/apache/hadoop/hbase/rest/MetricsRESTSourceImpl.java (original)
+++ hbase/branches/0.98/hbase-hadoop1-compat/src/main/java/org/apache/hadoop/hbase/rest/MetricsRESTSourceImpl.java Fri Jan 17 02:33:16 2014
@@ -33,9 +33,11 @@ public class MetricsRESTSourceImpl exten
   private MetricMutableCounterLong sucGet;
   private MetricMutableCounterLong sucPut;
   private MetricMutableCounterLong sucDel;
+  private MetricMutableCounterLong sucScan;
   private MetricMutableCounterLong fGet;
   private MetricMutableCounterLong fPut;
   private MetricMutableCounterLong fDel;
+  private MetricMutableCounterLong fScan;
 
   public MetricsRESTSourceImpl() {
     this(METRICS_NAME, METRICS_DESCRIPTION, CONTEXT, JMX_CONTEXT);
@@ -56,10 +58,12 @@ public class MetricsRESTSourceImpl exten
     sucGet = getMetricsRegistry().getLongCounter(SUCCESSFUL_GET_KEY, 0l);
     sucPut = getMetricsRegistry().getLongCounter(SUCCESSFUL_PUT_KEY, 0l);
     sucDel = getMetricsRegistry().getLongCounter(SUCCESSFUL_DELETE_KEY, 0l);
+    sucScan = getMetricsRegistry().getLongCounter(SUCCESSFUL_SCAN_KEY, 0L);
 
     fGet = getMetricsRegistry().getLongCounter(FAILED_GET_KEY, 0l);
     fPut = getMetricsRegistry().getLongCounter(FAILED_PUT_KEY, 0l);
     fDel = getMetricsRegistry().getLongCounter(FAILED_DELETE_KEY, 0l);
+    fScan = getMetricsRegistry().getLongCounter(FAILED_SCAN_KEY, 0l);
   }
 
   @Override
@@ -96,4 +100,14 @@ public class MetricsRESTSourceImpl exten
   public void incrementFailedDeleteRequests(int inc) {
     fDel.incr(inc);
   }
+
+  @Override
+  public void incrementSucessfulScanRequests(int inc) {
+    sucScan.incr(inc);
+  }
+
+  @Override
+  public void incrementFailedScanRequests(int inc) {
+    fScan.incr(inc);
+  }
 }

Modified: hbase/branches/0.98/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/rest/MetricsRESTSourceImpl.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/rest/MetricsRESTSourceImpl.java?rev=1558995&r1=1558994&r2=1558995&view=diff
==============================================================================
--- hbase/branches/0.98/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/rest/MetricsRESTSourceImpl.java (original)
+++ hbase/branches/0.98/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/rest/MetricsRESTSourceImpl.java Fri Jan 17 02:33:16 2014
@@ -35,9 +35,11 @@ public class MetricsRESTSourceImpl exten
   private MutableCounterLong sucGet;
   private MutableCounterLong sucPut;
   private MutableCounterLong sucDel;
+  private MutableCounterLong sucScan;
   private MutableCounterLong fGet;
   private MutableCounterLong fPut;
   private MutableCounterLong fDel;
+  private MutableCounterLong fScan;
 
   public MetricsRESTSourceImpl() {
     this(METRICS_NAME, METRICS_DESCRIPTION, CONTEXT, JMX_CONTEXT);
@@ -58,10 +60,12 @@ public class MetricsRESTSourceImpl exten
     sucGet = getMetricsRegistry().getLongCounter(SUCCESSFUL_GET_KEY, 0l);
     sucPut = getMetricsRegistry().getLongCounter(SUCCESSFUL_PUT_KEY, 0l);
     sucDel = getMetricsRegistry().getLongCounter(SUCCESSFUL_DELETE_KEY, 0l);
+    sucScan = getMetricsRegistry().getLongCounter(SUCCESSFUL_SCAN_KEY, 0L);
 
     fGet = getMetricsRegistry().getLongCounter(FAILED_GET_KEY, 0l);
     fPut = getMetricsRegistry().getLongCounter(FAILED_PUT_KEY, 0l);
     fDel = getMetricsRegistry().getLongCounter(FAILED_DELETE_KEY, 0l);
+    fScan = getMetricsRegistry().getLongCounter(FAILED_SCAN_KEY, 0l);
   }
 
   @Override
@@ -98,4 +102,14 @@ public class MetricsRESTSourceImpl exten
   public void incrementFailedDeleteRequests(int inc) {
     fDel.incr(inc);
   }
+
+  @Override
+  public void incrementSucessfulScanRequests(int inc) {
+    sucScan.incr(inc);
+  }
+
+  @Override
+  public void incrementFailedScanRequests(int inc) {
+   fScan.incr(inc);
+  }
 }

Modified: hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/Constants.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/Constants.java?rev=1558995&r1=1558994&r2=1558995&view=diff
==============================================================================
--- hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/Constants.java (original)
+++ hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/Constants.java Fri Jan 17 02:33:16 2014
@@ -57,5 +57,16 @@ public interface Constants {
 
   static final String REST_DNS_NAMESERVER = "hbase.rest.dns.nameserver";
   static final String REST_DNS_INTERFACE = "hbase.rest.dns.interface";
+
   public static final String FILTER_CLASSES = "hbase.rest.filter.classes";
+  public static final String SCAN_START_ROW = "startrow";
+  public static final String SCAN_END_ROW = "endrow";
+  public static final String SCAN_COLUMN = "column";
+  public static final String SCAN_START_TIME = "starttime";
+  public static final String SCAN_END_TIME = "endtime";
+  public static final String SCAN_MAX_VERSIONS = "maxversions";
+  public static final String SCAN_BATCH_SIZE = "batchsize";
+  public static final String SCAN_LIMIT = "limit";
+  public static final String SCAN_FETCH_SIZE = "hbase.rest.scan.fetchsize";
+
 }

Modified: hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/MetricsREST.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/MetricsREST.java?rev=1558995&r1=1558994&r2=1558995&view=diff
==============================================================================
--- hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/MetricsREST.java (original)
+++ hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/MetricsREST.java Fri Jan 17 02:33:16 2014
@@ -78,12 +78,26 @@ public class MetricsREST {
   public void incrementSucessfulDeleteRequests(final int inc) {
     source.incrementSucessfulDeleteRequests(inc);
   }
-  
+
   /**
    * @param inc How much to add to failedDeleteCount.
    */
   public void incrementFailedDeleteRequests(final int inc) {
     source.incrementFailedDeleteRequests(inc);
   }
-  
+
+  /**
+   * @param inc How much to add to sucessfulScanCount.
+   */
+  public synchronized void incrementSucessfulScanRequests(final int inc) {
+    source.incrementSucessfulScanRequests(inc);
+  }
+
+  /**
+   * @param inc How much to add to failedScanCount.
+   */
+  public void incrementFailedScanRequests(final int inc) {
+    source.incrementFailedScanRequests(inc);
+  }
+
 }

Added: hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/ProtobufStreamingUtil.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/ProtobufStreamingUtil.java?rev=1558995&view=auto
==============================================================================
--- hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/ProtobufStreamingUtil.java (added)
+++ hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/ProtobufStreamingUtil.java Fri Jan 17 02:33:16 2014
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.rest;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.List;
+
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.StreamingOutput;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.rest.model.CellModel;
+import org.apache.hadoop.hbase.rest.model.CellSetModel;
+import org.apache.hadoop.hbase.rest.model.RowModel;
+import org.apache.hadoop.hbase.util.Bytes;
+
+
+public class ProtobufStreamingUtil implements StreamingOutput {
+
+  private static final Log LOG = LogFactory.getLog(ProtobufStreamingUtil.class);
+  private String contentType;
+  private ResultScanner resultScanner;
+  private int limit;
+  private int fetchSize;
+
+  protected ProtobufStreamingUtil(ResultScanner scanner, String type, int limit, int fetchSize) {
+    this.resultScanner = scanner;
+    this.contentType = type;
+    this.limit = limit;
+    this.fetchSize = fetchSize;
+    LOG.debug("Created ScanStreamingUtil with content type = " + this.contentType + " user limit : "
+        + this.limit + " scan fetch size : " + this.fetchSize);
+  }
+
+  @Override
+  public void write(OutputStream outStream) throws IOException, WebApplicationException {
+    Result[] rowsToSend;
+    if(limit < fetchSize){
+      rowsToSend = this.resultScanner.next(limit);
+      writeToStream(createModelFromResults(rowsToSend), this.contentType, outStream);
+    } else {
+      int count = limit;
+      while (count > 0) {
+        if (count < fetchSize) {
+          rowsToSend = this.resultScanner.next(count);
+        } else {
+          rowsToSend = this.resultScanner.next(this.fetchSize);
+        }
+        if(rowsToSend.length == 0){
+          break;
+        }
+        count = count - rowsToSend.length;
+        writeToStream(createModelFromResults(rowsToSend), this.contentType, outStream);
+      }
+    }
+  }
+
+  private void writeToStream(CellSetModel model, String contentType, OutputStream outStream)
+      throws IOException {
+    byte[] objectBytes = model.createProtobufOutput();
+    outStream.write(Bytes.toBytes((short)objectBytes.length));
+    outStream.write(objectBytes);
+    outStream.flush();
+    LOG.trace("Wrote " + model.getRows().size() + " rows to stream successfully.");
+  }
+
+  private CellSetModel createModelFromResults(Result[] results) {
+    CellSetModel cellSetModel = new CellSetModel();
+    for (Result rs : results) {
+      byte[] rowKey = rs.getRow();
+      RowModel rModel = new RowModel(rowKey);
+      List<Cell> kvs = rs.listCells();
+      for (Cell kv : kvs) {
+        rModel.addCell(new CellModel(CellUtil.cloneFamily(kv), CellUtil.cloneQualifier(kv), kv
+            .getTimestamp(), CellUtil.cloneValue(kv)));
+      }
+      cellSetModel.addRow(rModel);
+    }
+    return cellSetModel;
+  }
+}

Modified: hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/TableResource.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/TableResource.java?rev=1558995&r1=1558994&r2=1558995&view=diff
==============================================================================
--- hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/TableResource.java (original)
+++ hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/TableResource.java Fri Jan 17 02:33:16 2014
@@ -20,18 +20,32 @@
 package org.apache.hadoop.hbase.rest;
 
 import java.io.IOException;
+import java.util.List;
 
+import javax.ws.rs.DefaultValue;
 import javax.ws.rs.Encoded;
+import javax.ws.rs.HeaderParam;
 import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
 import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.UriInfo;
 
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.PrefixFilter;
+import org.apache.hadoop.hbase.util.Bytes;
 
 @InterfaceAudience.Private
 public class TableResource extends ResourceBase {
 
   String table;
+  private static final Log LOG = LogFactory.getLog(TableResource.class);
 
   /**
    * Constructor
@@ -82,7 +96,7 @@ public class TableResource extends Resou
     return new MultiRowResource(this, versions);
   }
 
-  @Path("{rowspec: .+}")
+  @Path("{rowspec: [^*]+}")
   public RowResource getRowResource(
       // We need the @Encoded decorator so Jersey won't urldecode before
       // the RowSpec constructor has a chance to parse
@@ -91,4 +105,76 @@ public class TableResource extends Resou
       final @QueryParam("check") String check) throws IOException {
     return new RowResource(this, rowspec, versions, check);
   }
+
+  @Path("{suffixglobbingspec: .*\\*/.+}")
+  public RowResource getRowResourceWithSuffixGlobbing(
+      // We need the @Encoded decorator so Jersey won't urldecode before
+      // the RowSpec constructor has a chance to parse
+      final @PathParam("suffixglobbingspec") @Encoded String suffixglobbingspec,
+      final @QueryParam("v") String versions,
+      final @QueryParam("check") String check) throws IOException {
+    return new RowResource(this, suffixglobbingspec, versions, check);
+  }
+
+  @Path("{scanspec: .*[*]$}")
+  public TableScanResource  getScanResource(
+      final @Context UriInfo uriInfo,
+      final @PathParam("scanspec") String scanSpec,
+      final @HeaderParam("Accept") String contentType,
+      @DefaultValue(Integer.MAX_VALUE + "")
+      @QueryParam(Constants.SCAN_LIMIT) int userRequestedLimit,
+      @DefaultValue("") @QueryParam(Constants.SCAN_START_ROW) String startRow,
+      @DefaultValue("") @QueryParam(Constants.SCAN_END_ROW) String endRow,
+      @DefaultValue("") @QueryParam(Constants.SCAN_COLUMN) List<String> column,
+      @DefaultValue("1") @QueryParam(Constants.SCAN_MAX_VERSIONS) int maxVersions,
+      @DefaultValue("-1") @QueryParam(Constants.SCAN_BATCH_SIZE) int batchSize,
+      @DefaultValue("0") @QueryParam(Constants.SCAN_START_TIME) long startTime,
+      @DefaultValue(Long.MAX_VALUE + "") @QueryParam(Constants.SCAN_END_TIME) long endTime,
+      @DefaultValue("true") @QueryParam(Constants.SCAN_BATCH_SIZE) boolean cacheBlocks) {
+    try {
+      Filter filter = null;
+      if (scanSpec.indexOf('*') > 0) {
+        String prefix = scanSpec.substring(0, scanSpec.indexOf('*'));
+        filter = new PrefixFilter(Bytes.toBytes(prefix));
+      }
+      LOG.debug("Query parameters  : Table Name = > " + this.table + " Start Row => " + startRow
+          + " End Row => " + endRow + " Columns => " + column + " Start Time => " + startTime
+          + " End Time => " + endTime + " Cache Blocks => " + cacheBlocks + " Max Versions => "
+          + maxVersions + " Batch Size => " + batchSize);
+      HTableInterface hTable = RESTServlet.getInstance().getTable(this.table);
+      Scan tableScan = new Scan();
+      tableScan.setBatch(batchSize);
+      tableScan.setMaxVersions(maxVersions);
+      tableScan.setTimeRange(startTime, endTime);
+      tableScan.setStartRow(Bytes.toBytes(startRow));
+      tableScan.setStopRow(Bytes.toBytes(endRow));
+      for (String csplit : column) {
+        String[] familysplit = csplit.trim().split(":");
+        if (familysplit.length == 2) {
+          if (familysplit[1].length() > 0) {
+            LOG.debug("Scan family and column : " + familysplit[0] + "  " + familysplit[1]);
+            tableScan.addColumn(Bytes.toBytes(familysplit[0]), Bytes.toBytes(familysplit[1]));
+          } else {
+            tableScan.addFamily(Bytes.toBytes(familysplit[0]));
+            LOG.debug("Scan family : " + familysplit[0] + " and empty qualifier.");
+            tableScan.addColumn(Bytes.toBytes(familysplit[0]), null);
+          }
+        } else if (StringUtils.isNotEmpty(familysplit[0])){
+          LOG.debug("Scan family : " + familysplit[0]);
+          tableScan.addFamily(Bytes.toBytes(familysplit[0]));
+        }
+      }
+      if (filter != null) {
+        tableScan.setFilter(filter);
+      }
+      int fetchSize = this.servlet.getConfiguration().getInt(Constants.SCAN_FETCH_SIZE, 10);
+      tableScan.setCaching(fetchSize);
+     return new TableScanResource(hTable.getScanner(tableScan), userRequestedLimit);
+    } catch (Exception exp) {
+      servlet.getMetrics().incrementFailedScanRequests(1);
+      processException(exp);
+      LOG.warn(exp);
+      return null;
+    }
+  }
 }

Added: hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/TableScanResource.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/TableScanResource.java?rev=1558995&view=auto
==============================================================================
--- hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/TableScanResource.java (added)
+++ hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/TableScanResource.java Fri Jan 17 02:33:16 2014
@@ -0,0 +1,168 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.rest;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import javax.ws.rs.DefaultValue;
+import javax.ws.rs.GET;
+import javax.ws.rs.HeaderParam;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.ResponseBuilder;
+import javax.ws.rs.core.UriInfo;
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlRootElement;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.rest.model.CellModel;
+import org.apache.hadoop.hbase.rest.model.RowModel;
+import org.codehaus.jackson.annotate.JsonIgnore;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+@InterfaceAudience.Private
+public class TableScanResource  extends ResourceBase {
+
+  private static final Log LOG = LogFactory.getLog(TableScanResource.class);
+  TableResource tableResource;
+  ResultScanner results;
+  int userRequestedLimit;
+
+  public TableScanResource(ResultScanner scanner, int userRequestedLimit) throws IOException {
+    super();
+    this.results = scanner;
+    this.userRequestedLimit = userRequestedLimit;
+  }
+
+  @GET
+  @Produces({ Constants.MIMETYPE_XML, Constants.MIMETYPE_JSON })
+  public CellSetModelStream get(final @Context UriInfo uriInfo) {
+    servlet.getMetrics().incrementRequests(1);
+    final int rowsToSend = userRequestedLimit;
+    servlet.getMetrics().incrementSucessfulScanRequests(1);
+    final Iterator<Result> itr = results.iterator();
+    return new CellSetModelStream(new ArrayList<RowModel>() {
+      public Iterator<RowModel> iterator() {
+        return new Iterator<RowModel>() {
+          int count = rowsToSend;
+
+          @Override
+          public boolean hasNext() {
+            if (count > 0) {
+              return itr.hasNext();
+            } else {
+              return false;
+            }
+          }
+
+          @Override
+          public void remove() {
+            throw new UnsupportedOperationException(
+                "Remove method cannot be used in CellSetModelStream");
+          }
+
+          @Override
+          public RowModel next() {
+            Result rs = itr.next();
+            if ((rs == null) || (count <= 0)) {
+              return null;
+            }
+            byte[] rowKey = rs.getRow();
+            RowModel rModel = new RowModel(rowKey);
+            List<Cell> kvs = rs.listCells();
+            for (Cell kv : kvs) {
+              rModel.addCell(new CellModel(CellUtil.cloneFamily(kv), CellUtil.cloneQualifier(kv),
+                  kv.getTimestamp(), CellUtil.cloneValue(kv)));
+            }
+            count--;
+            return rModel;
+          }
+        };
+      }
+    });
+  }
+
+  @GET
+  @Produces({ Constants.MIMETYPE_PROTOBUF, Constants.MIMETYPE_PROTOBUF_IETF })
+  public Response getProtobuf(
+      final @Context UriInfo uriInfo,
+      final @PathParam("scanspec") String scanSpec,
+      final @HeaderParam("Accept") String contentType,
+      @DefaultValue(Integer.MAX_VALUE + "") @QueryParam(Constants.SCAN_LIMIT) int userRequestedLimit,
+      @DefaultValue("") @QueryParam(Constants.SCAN_START_ROW) String startRow,
+      @DefaultValue("") @QueryParam(Constants.SCAN_END_ROW) String endRow,
+      @DefaultValue("column") @QueryParam(Constants.SCAN_COLUMN) List<String> column,
+      @DefaultValue("1") @QueryParam(Constants.SCAN_MAX_VERSIONS) int maxVersions,
+      @DefaultValue("-1") @QueryParam(Constants.SCAN_BATCH_SIZE) int batchSize,
+      @DefaultValue("0") @QueryParam(Constants.SCAN_START_TIME) long startTime,
+      @DefaultValue(Long.MAX_VALUE + "") @QueryParam(Constants.SCAN_END_TIME) long endTime,
+      @DefaultValue("true") @QueryParam(Constants.SCAN_BATCH_SIZE) boolean cacheBlocks) {
+    servlet.getMetrics().incrementRequests(1);
+    try {
+      int fetchSize = this.servlet.getConfiguration().getInt(Constants.SCAN_FETCH_SIZE, 10);
+      ProtobufStreamingUtil stream = new ProtobufStreamingUtil(this.results, contentType,
+          userRequestedLimit, fetchSize);
+      servlet.getMetrics().incrementSucessfulScanRequests(1);
+      ResponseBuilder response = Response.ok(stream);
+      response.header("content-type", contentType);
+      return response.build();
+    } catch (Exception exp) {
+      servlet.getMetrics().incrementFailedScanRequests(1);
+      processException(exp);
+      LOG.warn(exp);
+      return null;
+    }
+  }
+
+  @XmlRootElement(name = "CellSet")
+  @XmlAccessorType(XmlAccessType.FIELD)
+  public static class CellSetModelStream {
+    // JAXB needs an arraylist for streaming
+    @XmlElement(name = "Row")
+    @JsonIgnore
+    private ArrayList<RowModel> Row;
+
+    public CellSetModelStream() {
+    }
+
+    public CellSetModelStream(final ArrayList<RowModel> rowList) {
+      this.Row = rowList;
+    }
+
+    // jackson needs an iterator for streaming
+    @JsonProperty("Row")
+    public Iterator<RowModel> getIterator() {
+      return Row.iterator();
+    }
+  }
+}

Modified: hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/client/Client.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/client/Client.java?rev=1558995&r1=1558994&r2=1558995&view=diff
==============================================================================
--- hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/client/Client.java (original)
+++ hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/client/Client.java Fri Jan 17 02:33:16 2014
@@ -20,6 +20,7 @@
 package org.apache.hadoop.hbase.rest.client;
 
 import java.io.IOException;
+import java.io.InputStream;
 import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -333,7 +334,8 @@ public class Client {
       int code = execute(c, method, headers, path);
       headers = method.getResponseHeaders();
       byte[] body = method.getResponseBody();
-      return new Response(code, headers, body);
+      InputStream in = method.getResponseBodyAsStream();
+      return new Response(code, headers, body, in);
     } finally {
       method.releaseConnection();
     }

Modified: hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/client/Response.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/client/Response.java?rev=1558995&r1=1558994&r2=1558995&view=diff
==============================================================================
--- hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/client/Response.java (original)
+++ hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/client/Response.java Fri Jan 17 02:33:16 2014
@@ -19,6 +19,8 @@
 
 package org.apache.hadoop.hbase.rest.client;
 
+import java.io.InputStream;
+
 import org.apache.commons.httpclient.Header;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
@@ -32,6 +34,7 @@ public class Response {
   private int code;
   private Header[] headers;
   private byte[] body;
+  private InputStream stream;
 
   /**
    * Constructor
@@ -61,6 +64,20 @@ public class Response {
     this.headers = headers;
     this.body = body;
   }
+  
+  /**
+   * Constructor
+   * @param code the HTTP response code
+   * @param headers headers the HTTP response headers
+   * @param body the response body, can be null
+   * @param in Inputstream if the response had one.
+   */
+  public Response(int code, Header[] headers, byte[] body, InputStream in) {
+    this.code = code;
+    this.headers = headers;
+    this.body = body;
+    this.stream = in;
+  }
 
   /**
    * @return the HTTP response code
@@ -68,6 +85,15 @@ public class Response {
   public int getCode() {
     return code;
   }
+  
+  /**
+   * Gets the input stream instance.
+   *
+   * @return an instance of InputStream class.
+   */
+  public InputStream getStream(){
+    return this.stream;
+  }
 
   /**
    * @return the HTTP response headers

Modified: hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/rest/TestGetAndPutResource.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/rest/TestGetAndPutResource.java?rev=1558995&r1=1558994&r2=1558995&view=diff
==============================================================================
--- hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/rest/TestGetAndPutResource.java (original)
+++ hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/rest/TestGetAndPutResource.java Fri Jan 17 02:33:16 2014
@@ -482,5 +482,103 @@ public class TestGetAndPutResource exten
     }
     return contains;
   }
+
+  @Test
+  public void testSuffixGlobbingXMLWithNewScanner() throws IOException, JAXBException {
+    String path = "/" + TABLE + "/fakerow";  // deliberate nonexistent row
+
+    CellSetModel cellSetModel = new CellSetModel();
+    RowModel rowModel = new RowModel(ROW_1);
+    rowModel.addCell(new CellModel(Bytes.toBytes(COLUMN_1),
+      Bytes.toBytes(VALUE_1)));
+    rowModel.addCell(new CellModel(Bytes.toBytes(COLUMN_2),
+      Bytes.toBytes(VALUE_2)));
+    cellSetModel.addRow(rowModel);
+    rowModel = new RowModel(ROW_2);
+    rowModel.addCell(new CellModel(Bytes.toBytes(COLUMN_1),
+      Bytes.toBytes(VALUE_3)));
+    rowModel.addCell(new CellModel(Bytes.toBytes(COLUMN_2),
+      Bytes.toBytes(VALUE_4)));
+    cellSetModel.addRow(rowModel);
+    StringWriter writer = new StringWriter();
+    xmlMarshaller.marshal(cellSetModel, writer);
+    Response response = client.put(path, Constants.MIMETYPE_XML,
+      Bytes.toBytes(writer.toString()));
+    Thread.yield();
+
+    // make sure the fake row was not actually created
+    response = client.get(path, Constants.MIMETYPE_XML);
+    assertEquals(response.getCode(), 404);
+
+    // check that all of the values were created
+    StringBuilder query = new StringBuilder();
+    query.append('/');
+    query.append(TABLE);
+    query.append('/');
+    query.append("testrow*");
+    response = client.get(query.toString(), Constants.MIMETYPE_XML);
+    assertEquals(response.getCode(), 200);
+    assertEquals(Constants.MIMETYPE_XML, response.getHeader("content-type"));
+    CellSetModel cellSet = (CellSetModel)
+      xmlUnmarshaller.unmarshal(new ByteArrayInputStream(response.getBody()));
+    assertTrue(cellSet.getRows().size() == 2);
+
+    response = deleteRow(TABLE, ROW_1);
+    assertEquals(response.getCode(), 200);
+    response = deleteRow(TABLE, ROW_2);
+    assertEquals(response.getCode(), 200);
+  }
+
+  @Test
+  public void testSuffixGlobbingXML() throws IOException, JAXBException {
+    String path = "/" + TABLE + "/fakerow";  // deliberate nonexistent row
+
+    CellSetModel cellSetModel = new CellSetModel();
+    RowModel rowModel = new RowModel(ROW_1);
+    rowModel.addCell(new CellModel(Bytes.toBytes(COLUMN_1),
+      Bytes.toBytes(VALUE_1)));
+    rowModel.addCell(new CellModel(Bytes.toBytes(COLUMN_2),
+      Bytes.toBytes(VALUE_2)));
+    cellSetModel.addRow(rowModel);
+    rowModel = new RowModel(ROW_2);
+    rowModel.addCell(new CellModel(Bytes.toBytes(COLUMN_1),
+      Bytes.toBytes(VALUE_3)));
+    rowModel.addCell(new CellModel(Bytes.toBytes(COLUMN_2),
+      Bytes.toBytes(VALUE_4)));
+    cellSetModel.addRow(rowModel);
+    StringWriter writer = new StringWriter();
+    xmlMarshaller.marshal(cellSetModel, writer);
+    Response response = client.put(path, Constants.MIMETYPE_XML,
+      Bytes.toBytes(writer.toString()));
+    Thread.yield();
+
+    // make sure the fake row was not actually created
+    response = client.get(path, Constants.MIMETYPE_XML);
+    assertEquals(response.getCode(), 404);
+
+    // check that all of the values were created
+    StringBuilder query = new StringBuilder();
+    query.append('/');
+    query.append(TABLE);
+    query.append('/');
+    query.append("testrow*");
+    query.append('/');
+    query.append(COLUMN_1);
+    response = client.get(query.toString(), Constants.MIMETYPE_XML);
+    assertEquals(response.getCode(), 200);
+    assertEquals(Constants.MIMETYPE_XML, response.getHeader("content-type"));
+    CellSetModel cellSet = (CellSetModel)
+      xmlUnmarshaller.unmarshal(new ByteArrayInputStream(response.getBody()));
+    List<RowModel> rows = cellSet.getRows();
+    assertTrue(rows.size() == 2);
+    for (RowModel row : rows) {
+      assertTrue(row.getCells().size() == 1);
+      assertEquals(COLUMN_1, Bytes.toString(row.getCells().get(0).getColumn()));
+    }
+    response = deleteRow(TABLE, ROW_1);
+    assertEquals(response.getCode(), 200);
+    response = deleteRow(TABLE, ROW_2);
+    assertEquals(response.getCode(), 200);
+  }
 }
 

Modified: hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/rest/TestScannerResource.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/rest/TestScannerResource.java?rev=1558995&r1=1558994&r2=1558995&view=diff
==============================================================================
--- hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/rest/TestScannerResource.java (original)
+++ hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/rest/TestScannerResource.java Fri Jan 17 02:33:16 2014
@@ -73,11 +73,11 @@ public class TestScannerResource {
   private static int expectedRows2;
   private static Configuration conf;
 
-  private static int insertData(String tableName, String column, double prob)
+  static int insertData(Configuration conf, String tableName, String column, double prob)
       throws IOException {
     Random rng = new Random();
     int count = 0;
-    HTable table = new HTable(TEST_UTIL.getConfiguration(), tableName);
+    HTable table = new HTable(conf, tableName);
     byte[] k = new byte[3];
     byte [][] famAndQf = KeyValue.parseColumn(Bytes.toBytes(column));
     for (byte b1 = 'a'; b1 < 'z'; b1++) {
@@ -97,10 +97,11 @@ public class TestScannerResource {
       }
     }
     table.flushCommits();
+    table.close();
     return count;
   }
 
-  private static int countCellSet(CellSetModel model) {
+  static int countCellSet(CellSetModel model) {
     int count = 0;
     Iterator<RowModel> rows = model.getRows().iterator();
     while (rows.hasNext()) {
@@ -170,8 +171,8 @@ public class TestScannerResource {
     htd.addFamily(new HColumnDescriptor(CFA));
     htd.addFamily(new HColumnDescriptor(CFB));
     admin.createTable(htd);
-    expectedRows1 = insertData(TABLE, COLUMN_1, 1.0);
-    expectedRows2 = insertData(TABLE, COLUMN_2, 0.5);
+    expectedRows1 = insertData(TEST_UTIL.getConfiguration(), TABLE, COLUMN_1, 1.0);
+    expectedRows2 = insertData(TEST_UTIL.getConfiguration(), TABLE, COLUMN_2, 0.5);
   }
 
   @AfterClass

Added: hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/rest/TestTableScan.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/rest/TestTableScan.java?rev=1558995&view=auto
==============================================================================
--- hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/rest/TestTableScan.java (added)
+++ hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/rest/TestTableScan.java Fri Jan 17 02:33:16 2014
@@ -0,0 +1,508 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.rest;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.ws.rs.core.MediaType;
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Unmarshaller;
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlRootElement;
+import javax.xml.parsers.SAXParserFactory;
+import javax.xml.stream.XMLStreamException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.rest.client.Client;
+import org.apache.hadoop.hbase.rest.client.Cluster;
+import org.apache.hadoop.hbase.rest.client.Response;
+import org.apache.hadoop.hbase.rest.model.CellModel;
+import org.apache.hadoop.hbase.rest.model.CellSetModel;
+import org.apache.hadoop.hbase.rest.model.RowModel;
+import org.apache.hadoop.hbase.rest.provider.JacksonProvider;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.JsonParser;
+import org.codehaus.jackson.JsonToken;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.xml.sax.InputSource;
+import org.xml.sax.XMLReader;
+
+@Category(MediumTests.class)
+public class TestTableScan {
+
+  private static final String TABLE = "TestScanResource";
+  private static final String CFA = "a";
+  private static final String CFB = "b";
+  private static final String COLUMN_1 = CFA + ":1";
+  private static final String COLUMN_2 = CFB + ":2";
+  private static Client client;
+  private static int expectedRows1;
+  private static int expectedRows2;
+  private static Configuration conf;
+
+  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+  private static final HBaseRESTTestingUtility REST_TEST_UTIL =
+    new HBaseRESTTestingUtility();
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    conf = TEST_UTIL.getConfiguration();
+    TEST_UTIL.startMiniCluster();
+    REST_TEST_UTIL.startServletContainer(conf);
+    client = new Client(new Cluster().add("localhost",
+      REST_TEST_UTIL.getServletPort()));
+    HBaseAdmin admin = TEST_UTIL.getHBaseAdmin();
+    if (!admin.tableExists(TABLE)) {
+    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(TABLE));
+    htd.addFamily(new HColumnDescriptor(CFA));
+    htd.addFamily(new HColumnDescriptor(CFB));
+    admin.createTable(htd);
+    expectedRows1 = TestScannerResource.insertData(conf, TABLE, COLUMN_1, 1.0);
+    expectedRows2 = TestScannerResource.insertData(conf, TABLE, COLUMN_2, 0.5);
+    }
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.getHBaseAdmin().disableTable(TABLE);
+    TEST_UTIL.getHBaseAdmin().deleteTable(TABLE);
+    REST_TEST_UTIL.shutdownServletContainer();
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Test
+  public void testSimpleScannerXML() throws IOException, JAXBException, XMLStreamException {
+    // Test scanning particular columns
+    StringBuilder builder = new StringBuilder();
+    builder.append("/*");
+    builder.append("?");
+    builder.append(Constants.SCAN_COLUMN + "=" + COLUMN_1);
+    builder.append("&");
+    builder.append(Constants.SCAN_LIMIT + "=10");
+    Response response = client.get("/" + TABLE + builder.toString(),
+      Constants.MIMETYPE_XML);
+    assertEquals(200, response.getCode());
+    assertEquals(Constants.MIMETYPE_XML, response.getHeader("content-type"));
+    JAXBContext ctx = JAXBContext.newInstance(CellSetModel.class);
+    Unmarshaller ush = ctx.createUnmarshaller();
+    CellSetModel model = (CellSetModel) ush.unmarshal(response.getStream());
+    int count = TestScannerResource.countCellSet(model);
+    assertEquals(10, count);
+    checkRowsNotNull(model);
+
+    //Test with no limit.
+    builder = new StringBuilder();
+    builder.append("/*");
+    builder.append("?");
+    builder.append(Constants.SCAN_COLUMN + "=" + COLUMN_1);
+    response = client.get("/" + TABLE + builder.toString(),
+      Constants.MIMETYPE_XML);
+    assertEquals(200, response.getCode());
+    assertEquals(Constants.MIMETYPE_XML, response.getHeader("content-type")); 
+    model = (CellSetModel) ush.unmarshal(response.getStream());
+    count = TestScannerResource.countCellSet(model);
+    assertEquals(expectedRows1, count);
+    checkRowsNotNull(model);
+
+    //Test with start and end row.
+    builder = new StringBuilder();
+    builder.append("/*");
+    builder.append("?");
+    builder.append(Constants.SCAN_COLUMN + "=" + COLUMN_1);
+    builder.append("&");
+    builder.append(Constants.SCAN_START_ROW + "=aaa");
+    builder.append("&");
+    builder.append(Constants.SCAN_END_ROW + "=aay");
+    response = client.get("/" + TABLE + builder.toString(),
+      Constants.MIMETYPE_XML);
+    assertEquals(200, response.getCode());
+    model = (CellSetModel) ush.unmarshal(response.getStream());
+    count = TestScannerResource.countCellSet(model);
+    RowModel startRow = model.getRows().get(0);
+    assertEquals("aaa", Bytes.toString(startRow.getKey()));
+    RowModel endRow = model.getRows().get(model.getRows().size() - 1);
+    assertEquals("aax", Bytes.toString(endRow.getKey()));
+    assertEquals(24, count);
+    checkRowsNotNull(model);
+
+    //Test with start row and limit.
+    builder = new StringBuilder();
+    builder.append("/*");
+    builder.append("?");
+    builder.append(Constants.SCAN_COLUMN + "=" + COLUMN_1);
+    builder.append("&");
+    builder.append(Constants.SCAN_START_ROW + "=aaa");
+    builder.append("&");
+    builder.append(Constants.SCAN_LIMIT + "=15");
+    response = client.get("/" + TABLE + builder.toString(),
+      Constants.MIMETYPE_XML);
+    assertEquals(200, response.getCode());
+    assertEquals(Constants.MIMETYPE_XML, response.getHeader("content-type"));
+    model = (CellSetModel) ush.unmarshal(response.getStream());
+    startRow = model.getRows().get(0);
+    assertEquals("aaa", Bytes.toString(startRow.getKey()));
+    count = TestScannerResource.countCellSet(model);
+    assertEquals(15, count);
+    checkRowsNotNull(model);
+
+  }
+
+  @Test
+  public void testSimpleScannerJson() throws IOException, JAXBException {
+    // Test scanning particular columns with limit.
+    StringBuilder builder = new StringBuilder();
+    builder.append("/*");
+    builder.append("?");
+    builder.append(Constants.SCAN_COLUMN + "=" + COLUMN_1);
+    builder.append("&");
+    builder.append(Constants.SCAN_LIMIT + "=20");
+    Response response = client.get("/" + TABLE + builder.toString(),
+      Constants.MIMETYPE_JSON);
+    assertEquals(200, response.getCode());
+    assertEquals(Constants.MIMETYPE_JSON, response.getHeader("content-type"));
+    ObjectMapper mapper = new JacksonProvider()
+        .locateMapper(CellSetModel.class, MediaType.APPLICATION_JSON_TYPE);
+    CellSetModel model = mapper.readValue(response.getStream(), CellSetModel.class);
+    int count = TestScannerResource.countCellSet(model);
+    assertEquals(20, count);
+    checkRowsNotNull(model);
+
+    //Test scanning with no limit.
+    builder = new StringBuilder();
+    builder.append("/*");
+    builder.append("?");
+    builder.append(Constants.SCAN_COLUMN + "=" + COLUMN_2);
+    response = client.get("/" + TABLE + builder.toString(),
+      Constants.MIMETYPE_JSON);
+    assertEquals(200, response.getCode());
+    assertEquals(Constants.MIMETYPE_JSON, response.getHeader("content-type"));
+    model = mapper.readValue(response.getStream(), CellSetModel.class);
+    count = TestScannerResource.countCellSet(model);
+    assertEquals(expectedRows2, count);
+    checkRowsNotNull(model);
+
+    //Test with start row and end row.
+    builder = new StringBuilder();
+    builder.append("/*");
+    builder.append("?");
+    builder.append(Constants.SCAN_COLUMN + "=" + COLUMN_1);
+    builder.append("&");
+    builder.append(Constants.SCAN_START_ROW + "=aaa");
+    builder.append("&");
+    builder.append(Constants.SCAN_END_ROW + "=aay");
+    response = client.get("/" + TABLE + builder.toString(),
+      Constants.MIMETYPE_JSON);
+    assertEquals(200, response.getCode());
+    model = mapper.readValue(response.getStream(), CellSetModel.class);
+    RowModel startRow = model.getRows().get(0);
+    assertEquals("aaa", Bytes.toString(startRow.getKey()));
+    RowModel endRow = model.getRows().get(model.getRows().size() - 1);
+    assertEquals("aax", Bytes.toString(endRow.getKey()));
+    count = TestScannerResource.countCellSet(model);
+    assertEquals(24, count);
+    checkRowsNotNull(model);
+  }
+
+  /**
+   * An example to scan using listener in unmarshaller for XML.
+   * @throws Exception the exception
+   */
+  @Test
+  public void testScanUsingListenerUnmarshallerXML() throws Exception {
+    StringBuilder builder = new StringBuilder();
+    builder.append("/*");
+    builder.append("?");
+    builder.append(Constants.SCAN_COLUMN + "=" + COLUMN_1);
+    builder.append("&");
+    builder.append(Constants.SCAN_LIMIT + "=10");
+    Response response = client.get("/" + TABLE + builder.toString(),
+      Constants.MIMETYPE_XML);
+    assertEquals(200, response.getCode());
+    assertEquals(Constants.MIMETYPE_XML, response.getHeader("content-type"));
+    JAXBContext context = JAXBContext.newInstance(ClientSideCellSetModel.class, RowModel.class,
+      CellModel.class);
+    Unmarshaller unmarshaller = context.createUnmarshaller();
+
+    final ClientSideCellSetModel.Listener listener = new ClientSideCellSetModel.Listener() {
+      @Override
+      public void handleRowModel(ClientSideCellSetModel helper, RowModel row) {
+        assertTrue(row.getKey() != null);
+        assertTrue(row.getCells().size() > 0);
+      }
+    };
+
+    // install the callback on all ClientSideCellSetModel instances
+    unmarshaller.setListener(new Unmarshaller.Listener() {
+        public void beforeUnmarshal(Object target, Object parent) {
+            if (target instanceof ClientSideCellSetModel) {
+                ((ClientSideCellSetModel) target).setCellSetModelListener(listener);
+            }
+        }
+
+        public void afterUnmarshal(Object target, Object parent) {
+            if (target instanceof ClientSideCellSetModel) {
+                ((ClientSideCellSetModel) target).setCellSetModelListener(null);
+            }
+        }
+    });
+
+    // create a new XML parser
+    SAXParserFactory factory = SAXParserFactory.newInstance();
+    factory.setNamespaceAware(true);
+    XMLReader reader = factory.newSAXParser().getXMLReader();
+    reader.setContentHandler(unmarshaller.getUnmarshallerHandler());
+    assertFalse(ClientSideCellSetModel.listenerInvoked);
+    reader.parse(new InputSource(response.getStream()));
+    assertTrue(ClientSideCellSetModel.listenerInvoked);
+
+  }
+
+  @Test
+  public void testStreamingJSON() throws Exception {
+    // Test scanning particular columns with limit.
+    StringBuilder builder = new StringBuilder();
+    builder.append("/*");
+    builder.append("?");
+    builder.append(Constants.SCAN_COLUMN + "=" + COLUMN_1);
+    builder.append("&");
+    builder.append(Constants.SCAN_LIMIT + "=20");
+    Response response = client.get("/" + TABLE + builder.toString(),
+      Constants.MIMETYPE_JSON);
+    assertEquals(200, response.getCode());
+    assertEquals(Constants.MIMETYPE_JSON, response.getHeader("content-type"));
+    ObjectMapper mapper = new JacksonProvider()
+        .locateMapper(CellSetModel.class, MediaType.APPLICATION_JSON_TYPE);
+    CellSetModel model = mapper.readValue(response.getStream(), CellSetModel.class);
+    int count = TestScannerResource.countCellSet(model);
+    assertEquals(20, count);
+    checkRowsNotNull(model);
+
+    //Test scanning with no limit.
+    builder = new StringBuilder();
+    builder.append("/*");
+    builder.append("?");
+    builder.append(Constants.SCAN_COLUMN + "=" + COLUMN_2);
+    response = client.get("/" + TABLE + builder.toString(),
+      Constants.MIMETYPE_JSON);
+    assertEquals(200, response.getCode());
+    assertEquals(Constants.MIMETYPE_JSON, response.getHeader("content-type"));
+    model = mapper.readValue(response.getStream(), CellSetModel.class);
+    count = TestScannerResource.countCellSet(model);
+    assertEquals(expectedRows2, count);
+    checkRowsNotNull(model);
+
+    //Test with start row and end row.
+    builder = new StringBuilder();
+    builder.append("/*");
+    builder.append("?");
+    builder.append(Constants.SCAN_COLUMN + "=" + COLUMN_1);
+    builder.append("&");
+    builder.append(Constants.SCAN_START_ROW + "=aaa");
+    builder.append("&");
+    builder.append(Constants.SCAN_END_ROW + "=aay");
+    response = client.get("/" + TABLE + builder.toString(),
+      Constants.MIMETYPE_JSON);
+    assertEquals(200, response.getCode());
+
+    count = 0;
+    JsonFactory jfactory = new JsonFactory(mapper);
+    JsonParser jParser = jfactory.createJsonParser(response.getStream());
+    boolean found = false;
+    while (jParser.nextToken() != JsonToken.END_OBJECT) {
+      if(jParser.getCurrentToken() == JsonToken.START_OBJECT && found) {
+        RowModel row = jParser.readValueAs(RowModel.class);
+        assertNotNull(row.getKey());
+        for (int i = 0; i < row.getCells().size(); i++) {
+          if (count == 0) {
+            assertEquals("aaa", Bytes.toString(row.getKey()));
+          }
+          if (count == 23) {
+            assertEquals("aax", Bytes.toString(row.getKey()));
+          }
+          count++;
+        }
+        jParser.skipChildren();
+      } else {
+        found = jParser.getCurrentToken() == JsonToken.START_ARRAY;
+      }
+    }
+    assertEquals(24, count);
+  }
+
+  @Test
+  public void testSimpleScannerProtobuf() throws Exception {
+    StringBuilder builder = new StringBuilder();
+    builder.append("/*");
+    builder.append("?");
+    builder.append(Constants.SCAN_COLUMN + "=" + COLUMN_1);
+    builder.append("&");
+    builder.append(Constants.SCAN_LIMIT + "=15");
+    Response response = client.get("/" + TABLE + builder.toString(),
+      Constants.MIMETYPE_PROTOBUF);
+    assertEquals(200, response.getCode());
+    assertEquals(Constants.MIMETYPE_PROTOBUF, response.getHeader("content-type"));
+    int rowCount = readProtobufStream(response.getStream());
+    assertEquals(15, rowCount);
+
+  //Test with start row and end row.
+    builder = new StringBuilder();
+    builder.append("/*");
+    builder.append("?");
+    builder.append(Constants.SCAN_COLUMN + "=" + COLUMN_1);
+    builder.append("&");
+    builder.append(Constants.SCAN_START_ROW + "=aaa");
+    builder.append("&");
+    builder.append(Constants.SCAN_END_ROW + "=aay");
+    response = client.get("/" + TABLE + builder.toString(),
+      Constants.MIMETYPE_PROTOBUF);
+    assertEquals(200, response.getCode());
+    assertEquals(Constants.MIMETYPE_PROTOBUF, response.getHeader("content-type"));
+    rowCount = readProtobufStream(response.getStream());
+    assertEquals(24, rowCount);
+  }
+
+  private void checkRowsNotNull(CellSetModel model) {
+    for (RowModel row: model.getRows()) {
+      assertTrue(row.getKey() != null);
+      assertTrue(row.getCells().size() > 0);
+    }
+  }
+
+  /**
+   * Read protobuf stream.
+   * @param inputStream the input stream
+   * @return The number of rows in the cell set model.
+   * @throws IOException Signals that an I/O exception has occurred.
+   */
+  public int readProtobufStream(InputStream inputStream) throws IOException{
+    DataInputStream stream = new DataInputStream(inputStream);
+    CellSetModel model = null;
+    int rowCount = 0;
+    try {
+      while (true) {
+        byte[] lengthBytes = new byte[2];
+        int readBytes = stream.read(lengthBytes);
+        if (readBytes == -1) {
+          break;
+        }
+        assertEquals(2, readBytes);
+        int length = Bytes.toShort(lengthBytes);
+        byte[] cellset = new byte[length];
+        stream.read(cellset);
+        model = new CellSetModel();
+        model.getObjectFromMessage(cellset);
+        checkRowsNotNull(model);
+        rowCount = rowCount + TestScannerResource.countCellSet(model);
+      }
+    } catch (EOFException exp) {
+      exp.printStackTrace();
+    } finally {
+      stream.close();
+    }
+    return rowCount;
+  }
+
+  @Test
+  public void testScanningUnknownColumnJson() throws IOException, JAXBException {
+    // Test scanning particular columns with limit.
+    StringBuilder builder = new StringBuilder();
+    builder.append("/*");
+    builder.append("?");
+    builder.append(Constants.SCAN_COLUMN + "=a:test");
+    Response response = client.get("/" + TABLE  + builder.toString(),
+      Constants.MIMETYPE_JSON);
+    assertEquals(200, response.getCode());
+    assertEquals(Constants.MIMETYPE_JSON, response.getHeader("content-type"));
+    ObjectMapper mapper = new JacksonProvider().locateMapper(CellSetModel.class,
+      MediaType.APPLICATION_JSON_TYPE);
+    CellSetModel model = mapper.readValue(response.getStream(), CellSetModel.class);
+    int count = TestScannerResource.countCellSet(model);
+    assertEquals(0, count);
+  }
+
+  /**
+   * The Class ClientSideCellSetModel which mimics cell set model, and contains listener to perform
+   * user defined operations on the row model.
+   */
+  @XmlRootElement(name = "CellSet")
+  @XmlAccessorType(XmlAccessType.FIELD)
+  public static class ClientSideCellSetModel implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * This list is not a real list; instead it will notify a listener whenever JAXB has
+     * unmarshalled the next row.
+     */
+    @XmlElement(name="Row")
+    private List<RowModel> row;
+
+    static boolean listenerInvoked = false;
+
+    /**
+     * Install a listener for row model on this object. If l is null, the listener
+     * is removed again.
+     */
+    public void setCellSetModelListener(final Listener l) {
+        row = (l == null) ? null : new ArrayList<RowModel>() {
+        private static final long serialVersionUID = 1L;
+
+            public boolean add(RowModel o) {
+                l.handleRowModel(ClientSideCellSetModel.this, o);
+                listenerInvoked = true;
+                return false;
+            }
+        };
+    }
+
+    /**
+     * This listener is invoked every time a new row model is unmarshalled.
+     */
+    public static interface Listener {
+        void handleRowModel(ClientSideCellSetModel helper, RowModel rowModel);
+    }
+  }
+}
+
+
+