You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ec...@apache.org on 2014/10/10 18:53:17 UTC
[36/38] HBASE-12197 Move rest to it's on module
http://git-wip-us.apache.org/repos/asf/hbase/blob/876617bd/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/TableScanResource.java
----------------------------------------------------------------------
diff --git a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/TableScanResource.java b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/TableScanResource.java
new file mode 100644
index 0000000..5cc2c7b
--- /dev/null
+++ b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/TableScanResource.java
@@ -0,0 +1,168 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.rest;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import javax.ws.rs.DefaultValue;
+import javax.ws.rs.GET;
+import javax.ws.rs.HeaderParam;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.ResponseBuilder;
+import javax.ws.rs.core.UriInfo;
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlRootElement;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.rest.model.CellModel;
+import org.apache.hadoop.hbase.rest.model.RowModel;
+import org.codehaus.jackson.annotate.JsonIgnore;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+@InterfaceAudience.Private
+public class TableScanResource extends ResourceBase {
+
+ private static final Log LOG = LogFactory.getLog(TableScanResource.class);
+ TableResource tableResource;
+ ResultScanner results;
+ int userRequestedLimit;
+
+ public TableScanResource(ResultScanner scanner, int userRequestedLimit) throws IOException {
+ super();
+ this.results = scanner;
+ this.userRequestedLimit = userRequestedLimit;
+ }
+
+ @GET
+ @Produces({ Constants.MIMETYPE_XML, Constants.MIMETYPE_JSON })
+ public CellSetModelStream get(final @Context UriInfo uriInfo) {
+ servlet.getMetrics().incrementRequests(1);
+ final int rowsToSend = userRequestedLimit;
+ servlet.getMetrics().incrementSucessfulScanRequests(1);
+ final Iterator<Result> itr = results.iterator();
+ return new CellSetModelStream(new ArrayList<RowModel>() {
+ public Iterator<RowModel> iterator() {
+ return new Iterator<RowModel>() {
+ int count = rowsToSend;
+
+ @Override
+ public boolean hasNext() {
+ if (count > 0) {
+ return itr.hasNext();
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException(
+ "Remove method cannot be used in CellSetModelStream");
+ }
+
+ @Override
+ public RowModel next() {
+ Result rs = itr.next();
+ if ((rs == null) || (count <= 0)) {
+ return null;
+ }
+ byte[] rowKey = rs.getRow();
+ RowModel rModel = new RowModel(rowKey);
+ List<Cell> kvs = rs.listCells();
+ for (Cell kv : kvs) {
+ rModel.addCell(new CellModel(CellUtil.cloneFamily(kv), CellUtil.cloneQualifier(kv),
+ kv.getTimestamp(), CellUtil.cloneValue(kv)));
+ }
+ count--;
+ return rModel;
+ }
+ };
+ }
+ });
+ }
+
+ @GET
+ @Produces({ Constants.MIMETYPE_PROTOBUF, Constants.MIMETYPE_PROTOBUF_IETF })
+ public Response getProtobuf(
+ final @Context UriInfo uriInfo,
+ final @PathParam("scanspec") String scanSpec,
+ final @HeaderParam("Accept") String contentType,
+ @DefaultValue(Integer.MAX_VALUE + "") @QueryParam(Constants.SCAN_LIMIT) int userRequestedLimit,
+ @DefaultValue("") @QueryParam(Constants.SCAN_START_ROW) String startRow,
+ @DefaultValue("") @QueryParam(Constants.SCAN_END_ROW) String endRow,
+ @DefaultValue("column") @QueryParam(Constants.SCAN_COLUMN) List<String> column,
+ @DefaultValue("1") @QueryParam(Constants.SCAN_MAX_VERSIONS) int maxVersions,
+ @DefaultValue("-1") @QueryParam(Constants.SCAN_BATCH_SIZE) int batchSize,
+ @DefaultValue("0") @QueryParam(Constants.SCAN_START_TIME) long startTime,
+ @DefaultValue(Long.MAX_VALUE + "") @QueryParam(Constants.SCAN_END_TIME) long endTime,
+ @DefaultValue("true") @QueryParam(Constants.SCAN_BATCH_SIZE) boolean cacheBlocks) {
+ servlet.getMetrics().incrementRequests(1);
+ try {
+ int fetchSize = this.servlet.getConfiguration().getInt(Constants.SCAN_FETCH_SIZE, 10);
+ ProtobufStreamingUtil stream = new ProtobufStreamingUtil(this.results, contentType,
+ userRequestedLimit, fetchSize);
+ servlet.getMetrics().incrementSucessfulScanRequests(1);
+ ResponseBuilder response = Response.ok(stream);
+ response.header("content-type", contentType);
+ return response.build();
+ } catch (Exception exp) {
+ servlet.getMetrics().incrementFailedScanRequests(1);
+ processException(exp);
+ LOG.warn(exp);
+ return null;
+ }
+ }
+
+ @XmlRootElement(name = "CellSet")
+ @XmlAccessorType(XmlAccessType.FIELD)
+ public static class CellSetModelStream {
+ // JAXB needs an arraylist for streaming
+ @XmlElement(name = "Row")
+ @JsonIgnore
+ private ArrayList<RowModel> Row;
+
+ public CellSetModelStream() {
+ }
+
+ public CellSetModelStream(final ArrayList<RowModel> rowList) {
+ this.Row = rowList;
+ }
+
+ // jackson needs an iterator for streaming
+ @JsonProperty("Row")
+ public Iterator<RowModel> getIterator() {
+ return Row.iterator();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/876617bd/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/VersionResource.java
----------------------------------------------------------------------
diff --git a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/VersionResource.java b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/VersionResource.java
new file mode 100644
index 0000000..ae93825
--- /dev/null
+++ b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/VersionResource.java
@@ -0,0 +1,104 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.rest;
+
+import java.io.IOException;
+
+import javax.servlet.ServletContext;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.CacheControl;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.UriInfo;
+import javax.ws.rs.core.Response.ResponseBuilder;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.rest.model.VersionModel;
+
+/**
+ * Implements REST software version reporting
+ * <p>
+ * <tt>/version/rest</tt>
+ * <p>
+ * <tt>/version</tt> (alias for <tt>/version/rest</tt>)
+ */
+@InterfaceAudience.Private
+public class VersionResource extends ResourceBase {
+
+ private static final Log LOG = LogFactory.getLog(VersionResource.class);
+
+ static CacheControl cacheControl;
+ static {
+ cacheControl = new CacheControl();
+ cacheControl.setNoCache(true);
+ cacheControl.setNoTransform(false);
+ }
+
+ /**
+ * Constructor
+ * @throws IOException
+ */
+ public VersionResource() throws IOException {
+ super();
+ }
+
+ /**
+ * Build a response for a version request.
+ * @param context servlet context
+ * @param uriInfo (JAX-RS context variable) request URL
+ * @return a response for a version request
+ */
+ @GET
+ @Produces({MIMETYPE_TEXT, MIMETYPE_XML, MIMETYPE_JSON, MIMETYPE_PROTOBUF,
+ MIMETYPE_PROTOBUF_IETF})
+ public Response get(final @Context ServletContext context,
+ final @Context UriInfo uriInfo) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("GET " + uriInfo.getAbsolutePath());
+ }
+ servlet.getMetrics().incrementRequests(1);
+ ResponseBuilder response = Response.ok(new VersionModel(context));
+ response.cacheControl(cacheControl);
+ servlet.getMetrics().incrementSucessfulGetRequests(1);
+ return response.build();
+ }
+
+ /**
+ * Dispatch to StorageClusterVersionResource
+ */
+ @Path("cluster")
+ public StorageClusterVersionResource getClusterVersionResource()
+ throws IOException {
+ return new StorageClusterVersionResource();
+ }
+
+ /**
+ * Dispatch <tt>/version/rest</tt> to self.
+ */
+ @Path("rest")
+ public VersionResource getVersionResource() {
+ return this;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/876617bd/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/Client.java
----------------------------------------------------------------------
diff --git a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/Client.java b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/Client.java
new file mode 100644
index 0000000..ebedf57
--- /dev/null
+++ b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/Client.java
@@ -0,0 +1,525 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.rest.client;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.commons.httpclient.Header;
+import org.apache.commons.httpclient.HttpClient;
+import org.apache.commons.httpclient.HttpMethod;
+import org.apache.commons.httpclient.HttpVersion;
+import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
+import org.apache.commons.httpclient.URI;
+import org.apache.commons.httpclient.methods.ByteArrayRequestEntity;
+import org.apache.commons.httpclient.methods.DeleteMethod;
+import org.apache.commons.httpclient.methods.GetMethod;
+import org.apache.commons.httpclient.methods.HeadMethod;
+import org.apache.commons.httpclient.methods.PostMethod;
+import org.apache.commons.httpclient.methods.PutMethod;
+import org.apache.commons.httpclient.params.HttpClientParams;
+import org.apache.commons.httpclient.params.HttpConnectionManagerParams;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+/**
+ * A wrapper around HttpClient which provides some useful function and
+ * semantics for interacting with the REST gateway.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class Client {
+ public static final Header[] EMPTY_HEADER_ARRAY = new Header[0];
+
+ private static final Log LOG = LogFactory.getLog(Client.class);
+
+ private HttpClient httpClient;
+ private Cluster cluster;
+ private boolean sslEnabled;
+
+ private Map<String, String> extraHeaders;
+
+ /**
+ * Default Constructor
+ */
+ public Client() {
+ this(null);
+ }
+
+ private void initialize(Cluster cluster, boolean sslEnabled) {
+ this.cluster = cluster;
+ this.sslEnabled = sslEnabled;
+ MultiThreadedHttpConnectionManager manager =
+ new MultiThreadedHttpConnectionManager();
+ HttpConnectionManagerParams managerParams = manager.getParams();
+ managerParams.setConnectionTimeout(2000); // 2 s
+ managerParams.setDefaultMaxConnectionsPerHost(10);
+ managerParams.setMaxTotalConnections(100);
+ extraHeaders = new ConcurrentHashMap<String, String>();
+ this.httpClient = new HttpClient(manager);
+ HttpClientParams clientParams = httpClient.getParams();
+ clientParams.setVersion(HttpVersion.HTTP_1_1);
+
+ }
+ /**
+ * Constructor
+ * @param cluster the cluster definition
+ */
+ public Client(Cluster cluster) {
+ initialize(cluster, false);
+ }
+
+ /**
+ * Constructor
+ * @param cluster the cluster definition
+ * @param sslEnabled enable SSL or not
+ */
+ public Client(Cluster cluster, boolean sslEnabled) {
+ initialize(cluster, sslEnabled);
+ }
+
+ /**
+ * Shut down the client. Close any open persistent connections.
+ */
+ public void shutdown() {
+ MultiThreadedHttpConnectionManager manager =
+ (MultiThreadedHttpConnectionManager) httpClient.getHttpConnectionManager();
+ manager.shutdown();
+ }
+
+ /**
+ * @return the wrapped HttpClient
+ */
+ public HttpClient getHttpClient() {
+ return httpClient;
+ }
+
+ /**
+ * Add extra headers. These extra headers will be applied to all http
+ * methods before they are removed. If any header is not used any more,
+ * client needs to remove it explicitly.
+ */
+ public void addExtraHeader(final String name, final String value) {
+ extraHeaders.put(name, value);
+ }
+
+ /**
+ * Get an extra header value.
+ */
+ public String getExtraHeader(final String name) {
+ return extraHeaders.get(name);
+ }
+
+ /**
+ * Get all extra headers (read-only).
+ */
+ public Map<String, String> getExtraHeaders() {
+ return Collections.unmodifiableMap(extraHeaders);
+ }
+
+ /**
+ * Remove an extra header.
+ */
+ public void removeExtraHeader(final String name) {
+ extraHeaders.remove(name);
+ }
+
+ /**
+ * Execute a transaction method given only the path. Will select at random
+ * one of the members of the supplied cluster definition and iterate through
+ * the list until a transaction can be successfully completed. The
+ * definition of success here is a complete HTTP transaction, irrespective
+ * of result code.
+ * @param cluster the cluster definition
+ * @param method the transaction method
+ * @param headers HTTP header values to send
+ * @param path the properly urlencoded path
+ * @return the HTTP response code
+ * @throws IOException
+ */
+ public int executePathOnly(Cluster cluster, HttpMethod method,
+ Header[] headers, String path) throws IOException {
+ IOException lastException;
+ if (cluster.nodes.size() < 1) {
+ throw new IOException("Cluster is empty");
+ }
+ int start = (int)Math.round((cluster.nodes.size() - 1) * Math.random());
+ int i = start;
+ do {
+ cluster.lastHost = cluster.nodes.get(i);
+ try {
+ StringBuilder sb = new StringBuilder();
+ if (sslEnabled) {
+ sb.append("https://");
+ } else {
+ sb.append("http://");
+ }
+ sb.append(cluster.lastHost);
+ sb.append(path);
+ URI uri = new URI(sb.toString(), true);
+ return executeURI(method, headers, uri.toString());
+ } catch (IOException e) {
+ lastException = e;
+ }
+ } while (++i != start && i < cluster.nodes.size());
+ throw lastException;
+ }
+
+ /**
+ * Execute a transaction method given a complete URI.
+ * @param method the transaction method
+ * @param headers HTTP header values to send
+ * @param uri a properly urlencoded URI
+ * @return the HTTP response code
+ * @throws IOException
+ */
+ public int executeURI(HttpMethod method, Header[] headers, String uri)
+ throws IOException {
+ method.setURI(new URI(uri, true));
+ for (Map.Entry<String, String> e: extraHeaders.entrySet()) {
+ method.addRequestHeader(e.getKey(), e.getValue());
+ }
+ if (headers != null) {
+ for (Header header: headers) {
+ method.addRequestHeader(header);
+ }
+ }
+ long startTime = System.currentTimeMillis();
+ int code = httpClient.executeMethod(method);
+ long endTime = System.currentTimeMillis();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(method.getName() + " " + uri + " " + code + " " +
+ method.getStatusText() + " in " + (endTime - startTime) + " ms");
+ }
+ return code;
+ }
+
+ /**
+ * Execute a transaction method. Will call either <tt>executePathOnly</tt>
+ * or <tt>executeURI</tt> depending on whether a path only is supplied in
+ * 'path', or if a complete URI is passed instead, respectively.
+ * @param cluster the cluster definition
+ * @param method the HTTP method
+ * @param headers HTTP header values to send
+ * @param path the properly urlencoded path or URI
+ * @return the HTTP response code
+ * @throws IOException
+ */
+ public int execute(Cluster cluster, HttpMethod method, Header[] headers,
+ String path) throws IOException {
+ if (path.startsWith("/")) {
+ return executePathOnly(cluster, method, headers, path);
+ }
+ return executeURI(method, headers, path);
+ }
+
+ /**
+ * @return the cluster definition
+ */
+ public Cluster getCluster() {
+ return cluster;
+ }
+
+ /**
+ * @param cluster the cluster definition
+ */
+ public void setCluster(Cluster cluster) {
+ this.cluster = cluster;
+ }
+
+ /**
+ * Send a HEAD request
+ * @param path the path or URI
+ * @return a Response object with response detail
+ * @throws IOException
+ */
+ public Response head(String path) throws IOException {
+ return head(cluster, path, null);
+ }
+
+ /**
+ * Send a HEAD request
+ * @param cluster the cluster definition
+ * @param path the path or URI
+ * @param headers the HTTP headers to include in the request
+ * @return a Response object with response detail
+ * @throws IOException
+ */
+ public Response head(Cluster cluster, String path, Header[] headers)
+ throws IOException {
+ HeadMethod method = new HeadMethod();
+ try {
+ int code = execute(cluster, method, null, path);
+ headers = method.getResponseHeaders();
+ return new Response(code, headers, null);
+ } finally {
+ method.releaseConnection();
+ }
+ }
+
+ /**
+ * Send a GET request
+ * @param path the path or URI
+ * @return a Response object with response detail
+ * @throws IOException
+ */
+ public Response get(String path) throws IOException {
+ return get(cluster, path);
+ }
+
+ /**
+ * Send a GET request
+ * @param cluster the cluster definition
+ * @param path the path or URI
+ * @return a Response object with response detail
+ * @throws IOException
+ */
+ public Response get(Cluster cluster, String path) throws IOException {
+ return get(cluster, path, EMPTY_HEADER_ARRAY);
+ }
+
+ /**
+ * Send a GET request
+ * @param path the path or URI
+ * @param accept Accept header value
+ * @return a Response object with response detail
+ * @throws IOException
+ */
+ public Response get(String path, String accept) throws IOException {
+ return get(cluster, path, accept);
+ }
+
+ /**
+ * Send a GET request
+ * @param cluster the cluster definition
+ * @param path the path or URI
+ * @param accept Accept header value
+ * @return a Response object with response detail
+ * @throws IOException
+ */
+ public Response get(Cluster cluster, String path, String accept)
+ throws IOException {
+ Header[] headers = new Header[1];
+ headers[0] = new Header("Accept", accept);
+ return get(cluster, path, headers);
+ }
+
+ /**
+ * Send a GET request
+ * @param path the path or URI
+ * @param headers the HTTP headers to include in the request,
+ * <tt>Accept</tt> must be supplied
+ * @return a Response object with response detail
+ * @throws IOException
+ */
+ public Response get(String path, Header[] headers) throws IOException {
+ return get(cluster, path, headers);
+ }
+
+ /**
+ * Send a GET request
+ * @param c the cluster definition
+ * @param path the path or URI
+ * @param headers the HTTP headers to include in the request
+ * @return a Response object with response detail
+ * @throws IOException
+ */
+ public Response get(Cluster c, String path, Header[] headers)
+ throws IOException {
+ GetMethod method = new GetMethod();
+ try {
+ int code = execute(c, method, headers, path);
+ headers = method.getResponseHeaders();
+ byte[] body = method.getResponseBody();
+ InputStream in = method.getResponseBodyAsStream();
+ return new Response(code, headers, body, in);
+ } finally {
+ method.releaseConnection();
+ }
+ }
+
+ /**
+ * Send a PUT request
+ * @param path the path or URI
+ * @param contentType the content MIME type
+ * @param content the content bytes
+ * @return a Response object with response detail
+ * @throws IOException
+ */
+ public Response put(String path, String contentType, byte[] content)
+ throws IOException {
+ return put(cluster, path, contentType, content);
+ }
+
+ /**
+ * Send a PUT request
+ * @param cluster the cluster definition
+ * @param path the path or URI
+ * @param contentType the content MIME type
+ * @param content the content bytes
+ * @return a Response object with response detail
+ * @throws IOException
+ */
+ public Response put(Cluster cluster, String path, String contentType,
+ byte[] content) throws IOException {
+ Header[] headers = new Header[1];
+ headers[0] = new Header("Content-Type", contentType);
+ return put(cluster, path, headers, content);
+ }
+
+ /**
+ * Send a PUT request
+ * @param path the path or URI
+ * @param headers the HTTP headers to include, <tt>Content-Type</tt> must be
+ * supplied
+ * @param content the content bytes
+ * @return a Response object with response detail
+ * @throws IOException
+ */
+ public Response put(String path, Header[] headers, byte[] content)
+ throws IOException {
+ return put(cluster, path, headers, content);
+ }
+
+ /**
+ * Send a PUT request
+ * @param cluster the cluster definition
+ * @param path the path or URI
+ * @param headers the HTTP headers to include, <tt>Content-Type</tt> must be
+ * supplied
+ * @param content the content bytes
+ * @return a Response object with response detail
+ * @throws IOException
+ */
+ public Response put(Cluster cluster, String path, Header[] headers,
+ byte[] content) throws IOException {
+ PutMethod method = new PutMethod();
+ try {
+ method.setRequestEntity(new ByteArrayRequestEntity(content));
+ int code = execute(cluster, method, headers, path);
+ headers = method.getResponseHeaders();
+ content = method.getResponseBody();
+ return new Response(code, headers, content);
+ } finally {
+ method.releaseConnection();
+ }
+ }
+
+ /**
+ * Send a POST request
+ * @param path the path or URI
+ * @param contentType the content MIME type
+ * @param content the content bytes
+ * @return a Response object with response detail
+ * @throws IOException
+ */
+ public Response post(String path, String contentType, byte[] content)
+ throws IOException {
+ return post(cluster, path, contentType, content);
+ }
+
+ /**
+ * Send a POST request
+ * @param cluster the cluster definition
+ * @param path the path or URI
+ * @param contentType the content MIME type
+ * @param content the content bytes
+ * @return a Response object with response detail
+ * @throws IOException
+ */
+ public Response post(Cluster cluster, String path, String contentType,
+ byte[] content) throws IOException {
+ Header[] headers = new Header[1];
+ headers[0] = new Header("Content-Type", contentType);
+ return post(cluster, path, headers, content);
+ }
+
+ /**
+ * Send a POST request
+ * @param path the path or URI
+ * @param headers the HTTP headers to include, <tt>Content-Type</tt> must be
+ * supplied
+ * @param content the content bytes
+ * @return a Response object with response detail
+ * @throws IOException
+ */
+ public Response post(String path, Header[] headers, byte[] content)
+ throws IOException {
+ return post(cluster, path, headers, content);
+ }
+
+ /**
+ * Send a POST request
+ * @param cluster the cluster definition
+ * @param path the path or URI
+ * @param headers the HTTP headers to include, <tt>Content-Type</tt> must be
+ * supplied
+ * @param content the content bytes
+ * @return a Response object with response detail
+ * @throws IOException
+ */
+ public Response post(Cluster cluster, String path, Header[] headers,
+ byte[] content) throws IOException {
+ PostMethod method = new PostMethod();
+ try {
+ method.setRequestEntity(new ByteArrayRequestEntity(content));
+ int code = execute(cluster, method, headers, path);
+ headers = method.getResponseHeaders();
+ content = method.getResponseBody();
+ return new Response(code, headers, content);
+ } finally {
+ method.releaseConnection();
+ }
+ }
+
+ /**
+ * Send a DELETE request
+ * @param path the path or URI
+ * @return a Response object with response detail
+ * @throws IOException
+ */
+ public Response delete(String path) throws IOException {
+ return delete(cluster, path);
+ }
+
+ /**
+ * Send a DELETE request
+ * @param cluster the cluster definition
+ * @param path the path or URI
+ * @return a Response object with response detail
+ * @throws IOException
+ */
+ public Response delete(Cluster cluster, String path) throws IOException {
+ DeleteMethod method = new DeleteMethod();
+ try {
+ int code = execute(cluster, method, null, path);
+ Header[] headers = method.getResponseHeaders();
+ byte[] content = method.getResponseBody();
+ return new Response(code, headers, content);
+ } finally {
+ method.releaseConnection();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/876617bd/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/Cluster.java
----------------------------------------------------------------------
diff --git a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/Cluster.java b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/Cluster.java
new file mode 100644
index 0000000..a2de329
--- /dev/null
+++ b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/Cluster.java
@@ -0,0 +1,103 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.rest.client;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+/**
+ * A list of 'host:port' addresses of HTTP servers operating as a single
+ * entity, for example multiple redundant web service gateways.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class Cluster {
+ protected List<String> nodes =
+ Collections.synchronizedList(new ArrayList<String>());
+ protected String lastHost;
+
+ /**
+ * Constructor
+ */
+ public Cluster() {}
+
+ /**
+ * Constructor
+ * @param nodes a list of service locations, in 'host:port' format
+ */
+ public Cluster(List<String> nodes) {
+ nodes.addAll(nodes);
+ }
+
+ /**
+ * @return true if no locations have been added, false otherwise
+ */
+ public boolean isEmpty() {
+ return nodes.isEmpty();
+ }
+
+ /**
+ * Add a node to the cluster
+ * @param node the service location in 'host:port' format
+ */
+ public Cluster add(String node) {
+ nodes.add(node);
+ return this;
+ }
+
+ /**
+ * Add a node to the cluster
+ * @param name host name
+ * @param port service port
+ */
+ public Cluster add(String name, int port) {
+ StringBuilder sb = new StringBuilder();
+ sb.append(name);
+ sb.append(':');
+ sb.append(port);
+ return add(sb.toString());
+ }
+
+ /**
+ * Remove a node from the cluster
+ * @param node the service location in 'host:port' format
+ */
+ public Cluster remove(String node) {
+ nodes.remove(node);
+ return this;
+ }
+
+ /**
+ * Remove a node from the cluster
+ * @param name host name
+ * @param port service port
+ */
+ public Cluster remove(String name, int port) {
+ StringBuilder sb = new StringBuilder();
+ sb.append(name);
+ sb.append(':');
+ sb.append(port);
+ return remove(sb.toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/876617bd/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteAdmin.java b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteAdmin.java
new file mode 100644
index 0000000..23da9c8
--- /dev/null
+++ b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteAdmin.java
@@ -0,0 +1,390 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.rest.client;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Unmarshaller;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.rest.Constants;
+import org.apache.hadoop.hbase.rest.model.StorageClusterStatusModel;
+import org.apache.hadoop.hbase.rest.model.StorageClusterVersionModel;
+import org.apache.hadoop.hbase.rest.model.TableListModel;
+import org.apache.hadoop.hbase.rest.model.TableSchemaModel;
+import org.apache.hadoop.hbase.rest.model.VersionModel;
+import org.apache.hadoop.hbase.util.Bytes;
+
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class RemoteAdmin {
+
+ final Client client;
+ final Configuration conf;
+ final String accessToken;
+ final int maxRetries;
+ final long sleepTime;
+
+ // This unmarshaller is necessary for getting the /version/cluster resource.
+ // This resource does not support protobufs. Therefore this is necessary to
+ // request/interpret it as XML.
+ private static volatile Unmarshaller versionClusterUnmarshaller;
+
+ /**
+ * Constructor
+ *
+ * @param client
+ * @param conf
+ */
+ public RemoteAdmin(Client client, Configuration conf) {
+ this(client, conf, null);
+ }
+
+ static Unmarshaller getUnmarsheller() throws JAXBException {
+
+ if (versionClusterUnmarshaller == null) {
+
+ RemoteAdmin.versionClusterUnmarshaller = JAXBContext.newInstance(
+ StorageClusterVersionModel.class).createUnmarshaller();
+ }
+ return RemoteAdmin.versionClusterUnmarshaller;
+ }
+
+ /**
+ * Constructor
+ * @param client
+ * @param conf
+ * @param accessToken
+ */
+ public RemoteAdmin(Client client, Configuration conf, String accessToken) {
+ this.client = client;
+ this.conf = conf;
+ this.accessToken = accessToken;
+ this.maxRetries = conf.getInt("hbase.rest.client.max.retries", 10);
+ this.sleepTime = conf.getLong("hbase.rest.client.sleep", 1000);
+ }
+
+ /**
+ * @param tableName name of table to check
+ * @return true if all regions of the table are available
+ * @throws IOException if a remote or network exception occurs
+ */
+ public boolean isTableAvailable(String tableName) throws IOException {
+ return isTableAvailable(Bytes.toBytes(tableName));
+ }
+
+ /**
+ * @return string representing the rest api's version
+ * @throws IOEXception
+ * if the endpoint does not exist, there is a timeout, or some other
+ * general failure mode
+ */
+ public VersionModel getRestVersion() throws IOException {
+
+ StringBuilder path = new StringBuilder();
+ path.append('/');
+ if (accessToken != null) {
+ path.append(accessToken);
+ path.append('/');
+ }
+
+ path.append("version/rest");
+
+ int code = 0;
+ for (int i = 0; i < maxRetries; i++) {
+ Response response = client.get(path.toString(),
+ Constants.MIMETYPE_PROTOBUF);
+ code = response.getCode();
+ switch (code) {
+ case 200:
+
+ VersionModel v = new VersionModel();
+ return (VersionModel) v.getObjectFromMessage(response.getBody());
+ case 404:
+ throw new IOException("REST version not found");
+ case 509:
+ try {
+ Thread.sleep(sleepTime);
+ } catch (InterruptedException e) {
+ }
+ break;
+ default:
+ throw new IOException("get request to " + path.toString()
+ + " returned " + code);
+ }
+ }
+ throw new IOException("get request to " + path.toString() + " timed out");
+ }
+
+ /**
+ * @return string representing the cluster's version
+ * @throws IOEXception if the endpoint does not exist, there is a timeout, or some other general failure mode
+ */
+ public StorageClusterStatusModel getClusterStatus() throws IOException {
+
+ StringBuilder path = new StringBuilder();
+ path.append('/');
+ if (accessToken !=null) {
+ path.append(accessToken);
+ path.append('/');
+ }
+
+ path.append("status/cluster");
+
+ int code = 0;
+ for (int i = 0; i < maxRetries; i++) {
+ Response response = client.get(path.toString(),
+ Constants.MIMETYPE_PROTOBUF);
+ code = response.getCode();
+ switch (code) {
+ case 200:
+ StorageClusterStatusModel s = new StorageClusterStatusModel();
+ return (StorageClusterStatusModel) s.getObjectFromMessage(response
+ .getBody());
+ case 404:
+ throw new IOException("Cluster version not found");
+ case 509:
+ try {
+ Thread.sleep(sleepTime);
+ } catch (InterruptedException e) {
+ }
+ break;
+ default:
+ throw new IOException("get request to " + path + " returned " + code);
+ }
+ }
+ throw new IOException("get request to " + path + " timed out");
+ }
+
+ /**
+ * @return string representing the cluster's version
+ * @throws IOEXception
+ * if the endpoint does not exist, there is a timeout, or some other
+ * general failure mode
+ */
+ public StorageClusterVersionModel getClusterVersion() throws IOException {
+
+ StringBuilder path = new StringBuilder();
+ path.append('/');
+ if (accessToken != null) {
+ path.append(accessToken);
+ path.append('/');
+ }
+
+ path.append("version/cluster");
+
+ int code = 0;
+ for (int i = 0; i < maxRetries; i++) {
+ Response response = client.get(path.toString(), Constants.MIMETYPE_XML);
+ code = response.getCode();
+ switch (code) {
+ case 200:
+ try {
+
+ return (StorageClusterVersionModel) getUnmarsheller().unmarshal(
+ new ByteArrayInputStream(response.getBody()));
+ } catch (JAXBException jaxbe) {
+
+ throw new IOException(
+ "Issue parsing StorageClusterVersionModel object in XML form: "
+ + jaxbe.getLocalizedMessage());
+ }
+ case 404:
+ throw new IOException("Cluster version not found");
+ case 509:
+ try {
+ Thread.sleep(sleepTime);
+ } catch (InterruptedException e) {
+ }
+ break;
+ default:
+ throw new IOException(path.toString() + " request returned " + code);
+ }
+ }
+ throw new IOException("get request to " + path.toString()
+ + " request timed out");
+ }
+
+ /**
+ * @param tableName name of table to check
+ * @return true if all regions of the table are available
+ * @throws IOException if a remote or network exception occurs
+ */
+ public boolean isTableAvailable(byte[] tableName) throws IOException {
+ StringBuilder path = new StringBuilder();
+ path.append('/');
+ if (accessToken != null) {
+ path.append(accessToken);
+ path.append('/');
+ }
+ path.append(Bytes.toStringBinary(tableName));
+ path.append('/');
+ path.append("exists");
+ int code = 0;
+ for (int i = 0; i < maxRetries; i++) {
+ Response response = client.get(path.toString(), Constants.MIMETYPE_PROTOBUF);
+ code = response.getCode();
+ switch (code) {
+ case 200:
+ return true;
+ case 404:
+ return false;
+ case 509:
+ try {
+ Thread.sleep(sleepTime);
+ } catch (InterruptedException e) { }
+ break;
+ default:
+ throw new IOException("get request to " + path.toString() + " returned " + code);
+ }
+ }
+ throw new IOException("get request to " + path.toString() + " timed out");
+ }
+
+ /**
+ * Creates a new table.
+ * @param desc table descriptor for table
+ * @throws IOException if a remote or network exception occurs
+ */
+ public void createTable(HTableDescriptor desc)
+ throws IOException {
+ TableSchemaModel model = new TableSchemaModel(desc);
+ StringBuilder path = new StringBuilder();
+ path.append('/');
+ if (accessToken != null) {
+ path.append(accessToken);
+ path.append('/');
+ }
+ path.append(desc.getTableName());
+ path.append('/');
+ path.append("schema");
+ int code = 0;
+ for (int i = 0; i < maxRetries; i++) {
+ Response response = client.put(path.toString(), Constants.MIMETYPE_PROTOBUF,
+ model.createProtobufOutput());
+ code = response.getCode();
+ switch (code) {
+ case 201:
+ return;
+ case 509:
+ try {
+ Thread.sleep(sleepTime);
+ } catch (InterruptedException e) { }
+ break;
+ default:
+ throw new IOException("create request to " + path.toString() + " returned " + code);
+ }
+ }
+ throw new IOException("create request to " + path.toString() + " timed out");
+ }
+
+ /**
+ * Deletes a table.
+ * @param tableName name of table to delete
+ * @throws IOException if a remote or network exception occurs
+ */
+ public void deleteTable(final String tableName) throws IOException {
+ deleteTable(Bytes.toBytes(tableName));
+ }
+
+ /**
+ * Deletes a table.
+ * @param tableName name of table to delete
+ * @throws IOException if a remote or network exception occurs
+ */
+ public void deleteTable(final byte [] tableName) throws IOException {
+ StringBuilder path = new StringBuilder();
+ path.append('/');
+ if (accessToken != null) {
+ path.append(accessToken);
+ path.append('/');
+ }
+ path.append(Bytes.toStringBinary(tableName));
+ path.append('/');
+ path.append("schema");
+ int code = 0;
+ for (int i = 0; i < maxRetries; i++) {
+ Response response = client.delete(path.toString());
+ code = response.getCode();
+ switch (code) {
+ case 200:
+ return;
+ case 509:
+ try {
+ Thread.sleep(sleepTime);
+ } catch (InterruptedException e) { }
+ break;
+ default:
+ throw new IOException("delete request to " + path.toString() + " returned " + code);
+ }
+ }
+ throw new IOException("delete request to " + path.toString() + " timed out");
+ }
+
+ /**
+ * @return string representing the cluster's version
+ * @throws IOEXception
+ * if the endpoint does not exist, there is a timeout, or some other
+ * general failure mode
+ */
+ public TableListModel getTableList() throws IOException {
+
+ StringBuilder path = new StringBuilder();
+ path.append('/');
+ if (accessToken != null) {
+ path.append(accessToken);
+ path.append('/');
+ }
+
+ int code = 0;
+ for (int i = 0; i < maxRetries; i++) {
+ // Response response = client.get(path.toString(),
+ // Constants.MIMETYPE_XML);
+ Response response = client.get(path.toString(),
+ Constants.MIMETYPE_PROTOBUF);
+ code = response.getCode();
+ switch (code) {
+ case 200:
+ TableListModel t = new TableListModel();
+ return (TableListModel) t.getObjectFromMessage(response.getBody());
+ case 404:
+ throw new IOException("Table list not found");
+ case 509:
+ try {
+ Thread.sleep(sleepTime);
+ } catch (InterruptedException e) {
+ }
+ break;
+ default:
+ throw new IOException("get request to " + path.toString()
+ + " request returned " + code);
+ }
+ }
+ throw new IOException("get request to " + path.toString()
+ + " request timed out");
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/876617bd/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java
----------------------------------------------------------------------
diff --git a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java
new file mode 100644
index 0000000..fbede44
--- /dev/null
+++ b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java
@@ -0,0 +1,825 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.rest.client;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Row;
+import org.apache.hadoop.hbase.client.RowMutations;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.coprocessor.Batch;
+import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import org.apache.hadoop.hbase.io.TimeRange;
+import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
+import org.apache.hadoop.hbase.rest.Constants;
+import org.apache.hadoop.hbase.rest.model.CellModel;
+import org.apache.hadoop.hbase.rest.model.CellSetModel;
+import org.apache.hadoop.hbase.rest.model.RowModel;
+import org.apache.hadoop.hbase.rest.model.ScannerModel;
+import org.apache.hadoop.hbase.rest.model.TableSchemaModel;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.util.StringUtils;
+
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.Message;
+import com.google.protobuf.Service;
+import com.google.protobuf.ServiceException;
+
+/**
+ * HTable interface to remote tables accessed via REST gateway
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class RemoteHTable implements HTableInterface {
+
+ private static final Log LOG = LogFactory.getLog(RemoteHTable.class);
+
+ final Client client;
+ final Configuration conf;
+ final byte[] name;
+ final int maxRetries;
+ final long sleepTime;
+
+ @SuppressWarnings("rawtypes")
+ protected String buildRowSpec(final byte[] row, final Map familyMap,
+ final long startTime, final long endTime, final int maxVersions) {
+ StringBuffer sb = new StringBuffer();
+ sb.append('/');
+ sb.append(Bytes.toStringBinary(name));
+ sb.append('/');
+ sb.append(Bytes.toStringBinary(row));
+ Set families = familyMap.entrySet();
+ if (families != null) {
+ Iterator i = familyMap.entrySet().iterator();
+ sb.append('/');
+ while (i.hasNext()) {
+ Map.Entry e = (Map.Entry)i.next();
+ Collection quals = (Collection)e.getValue();
+ if (quals == null || quals.isEmpty()) {
+ // this is an unqualified family. append the family name and NO ':'
+ sb.append(Bytes.toStringBinary((byte[])e.getKey()));
+ } else {
+ Iterator ii = quals.iterator();
+ while (ii.hasNext()) {
+ sb.append(Bytes.toStringBinary((byte[])e.getKey()));
+ sb.append(':');
+ Object o = ii.next();
+ // Puts use byte[] but Deletes use KeyValue
+ if (o instanceof byte[]) {
+ sb.append(Bytes.toStringBinary((byte[])o));
+ } else if (o instanceof KeyValue) {
+ sb.append(Bytes.toStringBinary(((KeyValue)o).getQualifier()));
+ } else {
+ throw new RuntimeException("object type not handled");
+ }
+ if (ii.hasNext()) {
+ sb.append(',');
+ }
+ }
+ }
+ if (i.hasNext()) {
+ sb.append(',');
+ }
+ }
+ }
+ if (startTime >= 0 && endTime != Long.MAX_VALUE) {
+ sb.append('/');
+ sb.append(startTime);
+ if (startTime != endTime) {
+ sb.append(',');
+ sb.append(endTime);
+ }
+ } else if (endTime != Long.MAX_VALUE) {
+ sb.append('/');
+ sb.append(endTime);
+ }
+ if (maxVersions > 1) {
+ sb.append("?v=");
+ sb.append(maxVersions);
+ }
+ return sb.toString();
+ }
+
+ protected String buildMultiRowSpec(final byte[][] rows, int maxVersions) {
+ StringBuilder sb = new StringBuilder();
+ sb.append('/');
+ sb.append(Bytes.toStringBinary(name));
+ sb.append("/multiget/");
+ if (rows == null || rows.length == 0) {
+ return sb.toString();
+ }
+ sb.append("?");
+ for(int i=0; i<rows.length; i++) {
+ byte[] rk = rows[i];
+ if (i != 0) {
+ sb.append('&');
+ }
+ sb.append("row=");
+ sb.append(Bytes.toStringBinary(rk));
+ }
+ sb.append("&v=");
+ sb.append(maxVersions);
+
+ return sb.toString();
+ }
+
+ protected Result[] buildResultFromModel(final CellSetModel model) {
+ List<Result> results = new ArrayList<Result>();
+ for (RowModel row: model.getRows()) {
+ List<Cell> kvs = new ArrayList<Cell>();
+ for (CellModel cell: row.getCells()) {
+ byte[][] split = KeyValue.parseColumn(cell.getColumn());
+ byte[] column = split[0];
+ byte[] qualifier = null;
+ if (split.length == 1) {
+ qualifier = HConstants.EMPTY_BYTE_ARRAY;
+ } else if (split.length == 2) {
+ qualifier = split[1];
+ } else {
+ throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
+ }
+ kvs.add(new KeyValue(row.getKey(), column, qualifier,
+ cell.getTimestamp(), cell.getValue()));
+ }
+ results.add(Result.create(kvs));
+ }
+ return results.toArray(new Result[results.size()]);
+ }
+
+ protected CellSetModel buildModelFromPut(Put put) {
+ RowModel row = new RowModel(put.getRow());
+ long ts = put.getTimeStamp();
+ for (List<Cell> cells: put.getFamilyCellMap().values()) {
+ for (Cell cell: cells) {
+ KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
+ row.addCell(new CellModel(kv.getFamily(), kv.getQualifier(),
+ ts != HConstants.LATEST_TIMESTAMP ? ts : kv.getTimestamp(),
+ kv.getValue()));
+ }
+ }
+ CellSetModel model = new CellSetModel();
+ model.addRow(row);
+ return model;
+ }
+
+ /**
+ * Constructor
+ * @param client
+ * @param name
+ */
+ public RemoteHTable(Client client, String name) {
+ this(client, HBaseConfiguration.create(), Bytes.toBytes(name));
+ }
+
+ /**
+ * Constructor
+ * @param client
+ * @param conf
+ * @param name
+ */
+ public RemoteHTable(Client client, Configuration conf, String name) {
+ this(client, conf, Bytes.toBytes(name));
+ }
+
+ /**
+ * Constructor
+ * @param client
+ * @param conf
+ * @param name
+ */
+ public RemoteHTable(Client client, Configuration conf, byte[] name) {
+ this.client = client;
+ this.conf = conf;
+ this.name = name;
+ this.maxRetries = conf.getInt("hbase.rest.client.max.retries", 10);
+ this.sleepTime = conf.getLong("hbase.rest.client.sleep", 1000);
+ }
+
+ public byte[] getTableName() {
+ return name.clone();
+ }
+
+ @Override
+ public TableName getName() {
+ return TableName.valueOf(name);
+ }
+
+ public Configuration getConfiguration() {
+ return conf;
+ }
+
+ public HTableDescriptor getTableDescriptor() throws IOException {
+ StringBuilder sb = new StringBuilder();
+ sb.append('/');
+ sb.append(Bytes.toStringBinary(name));
+ sb.append('/');
+ sb.append("schema");
+ for (int i = 0; i < maxRetries; i++) {
+ Response response = client.get(sb.toString(), Constants.MIMETYPE_PROTOBUF);
+ int code = response.getCode();
+ switch (code) {
+ case 200:
+ TableSchemaModel schema = new TableSchemaModel();
+ schema.getObjectFromMessage(response.getBody());
+ return schema.getTableDescriptor();
+ case 509:
+ try {
+ Thread.sleep(sleepTime);
+ } catch (InterruptedException e) { }
+ break;
+ default:
+ throw new IOException("schema request returned " + code);
+ }
+ }
+ throw new IOException("schema request timed out");
+ }
+
+ public void close() throws IOException {
+ client.shutdown();
+ }
+
+ public Result get(Get get) throws IOException {
+ TimeRange range = get.getTimeRange();
+ String spec = buildRowSpec(get.getRow(), get.getFamilyMap(),
+ range.getMin(), range.getMax(), get.getMaxVersions());
+ if (get.getFilter() != null) {
+ LOG.warn("filters not supported on gets");
+ }
+ Result[] results = getResults(spec);
+ if (results.length > 0) {
+ if (results.length > 1) {
+ LOG.warn("too many results for get (" + results.length + ")");
+ }
+ return results[0];
+ } else {
+ return new Result();
+ }
+ }
+
+ public Result[] get(List<Get> gets) throws IOException {
+ byte[][] rows = new byte[gets.size()][];
+ int maxVersions = 1;
+ int count = 0;
+
+ for(Get g:gets) {
+
+ if ( count == 0 ) {
+ maxVersions = g.getMaxVersions();
+ } else if (g.getMaxVersions() != maxVersions) {
+ LOG.warn("MaxVersions on Gets do not match, using the first in the list ("+maxVersions+")");
+ }
+
+ if (g.getFilter() != null) {
+ LOG.warn("filters not supported on gets");
+ }
+
+ rows[count] = g.getRow();
+ count ++;
+ }
+
+ String spec = buildMultiRowSpec(rows, maxVersions);
+
+ return getResults(spec);
+ }
+
+ private Result[] getResults(String spec) throws IOException {
+ for (int i = 0; i < maxRetries; i++) {
+ Response response = client.get(spec, Constants.MIMETYPE_PROTOBUF);
+ int code = response.getCode();
+ switch (code) {
+ case 200:
+ CellSetModel model = new CellSetModel();
+ model.getObjectFromMessage(response.getBody());
+ Result[] results = buildResultFromModel(model);
+ if ( results.length > 0) {
+ return results;
+ }
+ // fall through
+ case 404:
+ return new Result[0];
+
+ case 509:
+ try {
+ Thread.sleep(sleepTime);
+ } catch (InterruptedException e) { }
+ break;
+ default:
+ throw new IOException("get request returned " + code);
+ }
+ }
+ throw new IOException("get request timed out");
+ }
+
+ public boolean exists(Get get) throws IOException {
+ LOG.warn("exists() is really get(), just use get()");
+ Result result = get(get);
+ return (result != null && !(result.isEmpty()));
+ }
+
+ /**
+ * exists(List) is really a list of get() calls. Just use get().
+ * @param gets list of Get to test for the existence
+ */
+ public Boolean[] exists(List<Get> gets) throws IOException {
+ LOG.warn("exists(List<Get>) is really list of get() calls, just use get()");
+ Boolean[] results = new Boolean[gets.size()];
+ for (int i = 0; i < results.length; i++) {
+ results[i] = exists(gets.get(i));
+ }
+ return results;
+ }
+
+ public void put(Put put) throws IOException {
+ CellSetModel model = buildModelFromPut(put);
+ StringBuilder sb = new StringBuilder();
+ sb.append('/');
+ sb.append(Bytes.toStringBinary(name));
+ sb.append('/');
+ sb.append(Bytes.toStringBinary(put.getRow()));
+ for (int i = 0; i < maxRetries; i++) {
+ Response response = client.put(sb.toString(), Constants.MIMETYPE_PROTOBUF,
+ model.createProtobufOutput());
+ int code = response.getCode();
+ switch (code) {
+ case 200:
+ return;
+ case 509:
+ try {
+ Thread.sleep(sleepTime);
+ } catch (InterruptedException e) { }
+ break;
+ default:
+ throw new IOException("put request failed with " + code);
+ }
+ }
+ throw new IOException("put request timed out");
+ }
+
+ public void put(List<Put> puts) throws IOException {
+ // this is a trick: The gateway accepts multiple rows in a cell set and
+ // ignores the row specification in the URI
+
+ // separate puts by row
+ TreeMap<byte[],List<Cell>> map =
+ new TreeMap<byte[],List<Cell>>(Bytes.BYTES_COMPARATOR);
+ for (Put put: puts) {
+ byte[] row = put.getRow();
+ List<Cell> cells = map.get(row);
+ if (cells == null) {
+ cells = new ArrayList<Cell>();
+ map.put(row, cells);
+ }
+ for (List<Cell> l: put.getFamilyCellMap().values()) {
+ cells.addAll(l);
+ }
+ }
+
+ // build the cell set
+ CellSetModel model = new CellSetModel();
+ for (Map.Entry<byte[], List<Cell>> e: map.entrySet()) {
+ RowModel row = new RowModel(e.getKey());
+ for (Cell cell: e.getValue()) {
+ KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
+ row.addCell(new CellModel(kv));
+ }
+ model.addRow(row);
+ }
+
+ // build path for multiput
+ StringBuilder sb = new StringBuilder();
+ sb.append('/');
+ sb.append(Bytes.toStringBinary(name));
+ sb.append("/$multiput"); // can be any nonexistent row
+ for (int i = 0; i < maxRetries; i++) {
+ Response response = client.put(sb.toString(), Constants.MIMETYPE_PROTOBUF,
+ model.createProtobufOutput());
+ int code = response.getCode();
+ switch (code) {
+ case 200:
+ return;
+ case 509:
+ try {
+ Thread.sleep(sleepTime);
+ } catch (InterruptedException e) { }
+ break;
+ default:
+ throw new IOException("multiput request failed with " + code);
+ }
+ }
+ throw new IOException("multiput request timed out");
+ }
+
+ public void delete(Delete delete) throws IOException {
+ String spec = buildRowSpec(delete.getRow(), delete.getFamilyCellMap(),
+ delete.getTimeStamp(), delete.getTimeStamp(), 1);
+ for (int i = 0; i < maxRetries; i++) {
+ Response response = client.delete(spec);
+ int code = response.getCode();
+ switch (code) {
+ case 200:
+ return;
+ case 509:
+ try {
+ Thread.sleep(sleepTime);
+ } catch (InterruptedException e) { }
+ break;
+ default:
+ throw new IOException("delete request failed with " + code);
+ }
+ }
+ throw new IOException("delete request timed out");
+ }
+
+ public void delete(List<Delete> deletes) throws IOException {
+ for (Delete delete: deletes) {
+ delete(delete);
+ }
+ }
+
+ public void flushCommits() throws IOException {
+ // no-op
+ }
+
+ class Scanner implements ResultScanner {
+
+ String uri;
+
+ public Scanner(Scan scan) throws IOException {
+ ScannerModel model;
+ try {
+ model = ScannerModel.fromScan(scan);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ StringBuffer sb = new StringBuffer();
+ sb.append('/');
+ sb.append(Bytes.toStringBinary(name));
+ sb.append('/');
+ sb.append("scanner");
+ for (int i = 0; i < maxRetries; i++) {
+ Response response = client.post(sb.toString(),
+ Constants.MIMETYPE_PROTOBUF, model.createProtobufOutput());
+ int code = response.getCode();
+ switch (code) {
+ case 201:
+ uri = response.getLocation();
+ return;
+ case 509:
+ try {
+ Thread.sleep(sleepTime);
+ } catch (InterruptedException e) { }
+ break;
+ default:
+ throw new IOException("scan request failed with " + code);
+ }
+ }
+ throw new IOException("scan request timed out");
+ }
+
+ @Override
+ public Result[] next(int nbRows) throws IOException {
+ StringBuilder sb = new StringBuilder(uri);
+ sb.append("?n=");
+ sb.append(nbRows);
+ for (int i = 0; i < maxRetries; i++) {
+ Response response = client.get(sb.toString(),
+ Constants.MIMETYPE_PROTOBUF);
+ int code = response.getCode();
+ switch (code) {
+ case 200:
+ CellSetModel model = new CellSetModel();
+ model.getObjectFromMessage(response.getBody());
+ return buildResultFromModel(model);
+ case 204:
+ case 206:
+ return null;
+ case 509:
+ try {
+ Thread.sleep(sleepTime);
+ } catch (InterruptedException e) { }
+ break;
+ default:
+ throw new IOException("scanner.next request failed with " + code);
+ }
+ }
+ throw new IOException("scanner.next request timed out");
+ }
+
+ @Override
+ public Result next() throws IOException {
+ Result[] results = next(1);
+ if (results == null || results.length < 1) {
+ return null;
+ }
+ return results[0];
+ }
+
+ class Iter implements Iterator<Result> {
+
+ Result cache;
+
+ public Iter() {
+ try {
+ cache = Scanner.this.next();
+ } catch (IOException e) {
+ LOG.warn(StringUtils.stringifyException(e));
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ return cache != null;
+ }
+
+ @Override
+ public Result next() {
+ Result result = cache;
+ try {
+ cache = Scanner.this.next();
+ } catch (IOException e) {
+ LOG.warn(StringUtils.stringifyException(e));
+ cache = null;
+ }
+ return result;
+ }
+
+ @Override
+ public void remove() {
+ throw new RuntimeException("remove() not supported");
+ }
+
+ }
+
+ @Override
+ public Iterator<Result> iterator() {
+ return new Iter();
+ }
+
+ @Override
+ public void close() {
+ try {
+ client.delete(uri);
+ } catch (IOException e) {
+ LOG.warn(StringUtils.stringifyException(e));
+ }
+ }
+
+ }
+
+ public ResultScanner getScanner(Scan scan) throws IOException {
+ return new Scanner(scan);
+ }
+
+ public ResultScanner getScanner(byte[] family) throws IOException {
+ Scan scan = new Scan();
+ scan.addFamily(family);
+ return new Scanner(scan);
+ }
+
+ public ResultScanner getScanner(byte[] family, byte[] qualifier)
+ throws IOException {
+ Scan scan = new Scan();
+ scan.addColumn(family, qualifier);
+ return new Scanner(scan);
+ }
+
+ public boolean isAutoFlush() {
+ return true;
+ }
+
+ public Result getRowOrBefore(byte[] row, byte[] family) throws IOException {
+ throw new IOException("getRowOrBefore not supported");
+ }
+
+ public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier,
+ byte[] value, Put put) throws IOException {
+ // column to check-the-value
+ put.add(new KeyValue(row, family, qualifier, value));
+
+ CellSetModel model = buildModelFromPut(put);
+ StringBuilder sb = new StringBuilder();
+ sb.append('/');
+ sb.append(Bytes.toStringBinary(name));
+ sb.append('/');
+ sb.append(Bytes.toStringBinary(put.getRow()));
+ sb.append("?check=put");
+
+ for (int i = 0; i < maxRetries; i++) {
+ Response response = client.put(sb.toString(),
+ Constants.MIMETYPE_PROTOBUF, model.createProtobufOutput());
+ int code = response.getCode();
+ switch (code) {
+ case 200:
+ return true;
+ case 304: // NOT-MODIFIED
+ return false;
+ case 509:
+ try {
+ Thread.sleep(sleepTime);
+ } catch (final InterruptedException e) {
+ }
+ break;
+ default:
+ throw new IOException("checkAndPut request failed with " + code);
+ }
+ }
+ throw new IOException("checkAndPut request timed out");
+ }
+
+ public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
+ byte[] value, Delete delete) throws IOException {
+ Put put = new Put(row);
+ // column to check-the-value
+ put.add(new KeyValue(row, family, qualifier, value));
+ CellSetModel model = buildModelFromPut(put);
+ StringBuilder sb = new StringBuilder();
+ sb.append('/');
+ sb.append(Bytes.toStringBinary(name));
+ sb.append('/');
+ sb.append(Bytes.toStringBinary(row));
+ sb.append("?check=delete");
+
+ for (int i = 0; i < maxRetries; i++) {
+ Response response = client.put(sb.toString(),
+ Constants.MIMETYPE_PROTOBUF, model.createProtobufOutput());
+ int code = response.getCode();
+ switch (code) {
+ case 200:
+ return true;
+ case 304: // NOT-MODIFIED
+ return false;
+ case 509:
+ try {
+ Thread.sleep(sleepTime);
+ } catch (final InterruptedException e) {
+ }
+ break;
+ default:
+ throw new IOException("checkAndDelete request failed with " + code);
+ }
+ }
+ throw new IOException("checkAndDelete request timed out");
+ }
+
+ public Result increment(Increment increment) throws IOException {
+ throw new IOException("Increment not supported");
+ }
+
+ public Result append(Append append) throws IOException {
+ throw new IOException("Append not supported");
+ }
+
+ public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier,
+ long amount) throws IOException {
+ throw new IOException("incrementColumnValue not supported");
+ }
+
+ public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier,
+ long amount, Durability durability) throws IOException {
+ throw new IOException("incrementColumnValue not supported");
+ }
+
+ @Override
+ public void batch(List<? extends Row> actions, Object[] results) throws IOException {
+ throw new IOException("batch not supported");
+ }
+
+ @Override
+ public Object[] batch(List<? extends Row> actions) throws IOException {
+ throw new IOException("batch not supported");
+ }
+
+ @Override
+ public <R> void batchCallback(List<? extends Row> actions, Object[] results,
+ Batch.Callback<R> callback) throws IOException, InterruptedException {
+ throw new IOException("batchCallback not supported");
+ }
+
+ @Override
+ public <R> Object[] batchCallback(List<? extends Row> actions, Batch.Callback<R> callback)
+ throws IOException, InterruptedException {
+ throw new IOException("batchCallback not supported");
+ }
+
+ @Override
+ public CoprocessorRpcChannel coprocessorService(byte[] row) {
+ throw new UnsupportedOperationException("coprocessorService not implemented");
+ }
+
+ @Override
+ public <T extends Service, R> Map<byte[], R> coprocessorService(Class<T> service,
+ byte[] startKey, byte[] endKey, Batch.Call<T, R> callable)
+ throws ServiceException, Throwable {
+ throw new UnsupportedOperationException("coprocessorService not implemented");
+ }
+
+ @Override
+ public <T extends Service, R> void coprocessorService(Class<T> service,
+ byte[] startKey, byte[] endKey, Batch.Call<T, R> callable, Batch.Callback<R> callback)
+ throws ServiceException, Throwable {
+ throw new UnsupportedOperationException("coprocessorService not implemented");
+ }
+
+ @Override
+ public void mutateRow(RowMutations rm) throws IOException {
+ throw new IOException("atomicMutation not supported");
+ }
+
+ @Override
+ public void setAutoFlush(boolean autoFlush) {
+ throw new UnsupportedOperationException("setAutoFlush not implemented");
+ }
+
+ @Override
+ public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) {
+ throw new UnsupportedOperationException("setAutoFlush not implemented");
+ }
+
+ @Override
+ public void setAutoFlushTo(boolean autoFlush) {
+ throw new UnsupportedOperationException("setAutoFlushTo not implemented");
+ }
+
+ @Override
+ public long getWriteBufferSize() {
+ throw new UnsupportedOperationException("getWriteBufferSize not implemented");
+ }
+
+ @Override
+ public void setWriteBufferSize(long writeBufferSize) throws IOException {
+ throw new IOException("setWriteBufferSize not supported");
+ }
+
+ @Override
+ public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier,
+ long amount, boolean writeToWAL) throws IOException {
+ throw new IOException("incrementColumnValue not supported");
+ }
+
+ @Override
+ public <R extends Message> Map<byte[], R> batchCoprocessorService(
+ Descriptors.MethodDescriptor method, Message request,
+ byte[] startKey, byte[] endKey, R responsePrototype) throws ServiceException, Throwable {
+ throw new UnsupportedOperationException("batchCoprocessorService not implemented");
+ }
+
+ @Override
+ public <R extends Message> void batchCoprocessorService(
+ Descriptors.MethodDescriptor method, Message request,
+ byte[] startKey, byte[] endKey, R responsePrototype, Callback<R> callback)
+ throws ServiceException, Throwable {
+ throw new UnsupportedOperationException("batchCoprocessorService not implemented");
+ }
+
+ @Override
+ public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, CompareOp compareOp,
+ byte[] value, RowMutations mutation) throws IOException {
+ throw new UnsupportedOperationException("checkAndMutate not implemented");
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/876617bd/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/Response.java
----------------------------------------------------------------------
diff --git a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/Response.java b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/Response.java
new file mode 100644
index 0000000..871b646
--- /dev/null
+++ b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/Response.java
@@ -0,0 +1,155 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.rest.client;
+
+import java.io.InputStream;
+
+import org.apache.commons.httpclient.Header;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+/**
+ * The HTTP result code, response headers, and body of a HTTP response.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class Response {
+ private int code;
+ private Header[] headers;
+ private byte[] body;
+ private InputStream stream;
+
+ /**
+ * Constructor
+ * @param code the HTTP response code
+ */
+ public Response(int code) {
+ this(code, null, null);
+ }
+
+ /**
+ * Constructor
+ * @param code the HTTP response code
+ * @param headers the HTTP response headers
+ */
+ public Response(int code, Header[] headers) {
+ this(code, headers, null);
+ }
+
+ /**
+ * Constructor
+ * @param code the HTTP response code
+ * @param headers the HTTP response headers
+ * @param body the response body, can be null
+ */
+ public Response(int code, Header[] headers, byte[] body) {
+ this.code = code;
+ this.headers = headers;
+ this.body = body;
+ }
+
+ /**
+ * Constructor
+ * @param code the HTTP response code
+ * @param headers headers the HTTP response headers
+ * @param body the response body, can be null
+ * @param in Inputstream if the response had one.
+ */
+ public Response(int code, Header[] headers, byte[] body, InputStream in) {
+ this.code = code;
+ this.headers = headers;
+ this.body = body;
+ this.stream = in;
+ }
+
+ /**
+ * @return the HTTP response code
+ */
+ public int getCode() {
+ return code;
+ }
+
+ /**
+ * Gets the input stream instance.
+ *
+ * @return an instance of InputStream class.
+ */
+ public InputStream getStream(){
+ return this.stream;
+ }
+
+ /**
+ * @return the HTTP response headers
+ */
+ public Header[] getHeaders() {
+ return headers;
+ }
+
+ public String getHeader(String key) {
+ for (Header header: headers) {
+ if (header.getName().equalsIgnoreCase(key)) {
+ return header.getValue();
+ }
+ }
+ return null;
+ }
+
+ /**
+ * @return the value of the Location header
+ */
+ public String getLocation() {
+ return getHeader("Location");
+ }
+
+ /**
+ * @return true if a response body was sent
+ */
+ public boolean hasBody() {
+ return body != null;
+ }
+
+ /**
+ * @return the HTTP response body
+ */
+ public byte[] getBody() {
+ return body;
+ }
+
+ /**
+ * @param code the HTTP response code
+ */
+ public void setCode(int code) {
+ this.code = code;
+ }
+
+ /**
+ * @param headers the HTTP response headers
+ */
+ public void setHeaders(Header[] headers) {
+ this.headers = headers;
+ }
+
+ /**
+ * @param body the response body
+ */
+ public void setBody(byte[] body) {
+ this.body = body;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/876617bd/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/filter/AuthFilter.java
----------------------------------------------------------------------
diff --git a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/filter/AuthFilter.java b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/filter/AuthFilter.java
new file mode 100644
index 0000000..6d68cdd
--- /dev/null
+++ b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/filter/AuthFilter.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.rest.filter;
+
+import static org.apache.hadoop.hbase.rest.Constants.REST_AUTHENTICATION_PRINCIPAL;
+import static org.apache.hadoop.hbase.rest.Constants.REST_DNS_INTERFACE;
+import static org.apache.hadoop.hbase.rest.Constants.REST_DNS_NAMESERVER;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Properties;
+
+import javax.servlet.FilterConfig;
+import javax.servlet.ServletException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.util.Strings;
+import org.apache.hadoop.net.DNS;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.authentication.server.AuthenticationFilter;
+
+public class AuthFilter extends AuthenticationFilter {
+ private static final Log LOG = LogFactory.getLog(AuthFilter.class);
+ private static final String REST_PREFIX = "hbase.rest.authentication.";
+ private static final int REST_PREFIX_LEN = REST_PREFIX.length();
+
+ /**
+ * Returns the configuration to be used by the authentication filter
+ * to initialize the authentication handler.
+ *
+ * This filter retrieves all HBase configurations and passes those started
+ * with REST_PREFIX to the authentication handler. It is useful to support
+ * plugging different authentication handlers.
+ */
+ @Override
+ protected Properties getConfiguration(
+ String configPrefix, FilterConfig filterConfig) throws ServletException {
+ Properties props = super.getConfiguration(configPrefix, filterConfig);
+ //setting the cookie path to root '/' so it is used for all resources.
+ props.setProperty(AuthenticationFilter.COOKIE_PATH, "/");
+
+ Configuration conf = HBaseConfiguration.create();
+ for (Map.Entry<String, String> entry : conf) {
+ String name = entry.getKey();
+ if (name.startsWith(REST_PREFIX)) {
+ String value = entry.getValue();
+ if(name.equals(REST_AUTHENTICATION_PRINCIPAL)) {
+ try {
+ String machineName = Strings.domainNamePointerToHostName(
+ DNS.getDefaultHost(conf.get(REST_DNS_INTERFACE, "default"),
+ conf.get(REST_DNS_NAMESERVER, "default")));
+ value = SecurityUtil.getServerPrincipal(value, machineName);
+ } catch (IOException ie) {
+ throw new ServletException("Failed to retrieve server principal", ie);
+ }
+ }
+ LOG.debug("Setting property " + name + "=" + value);
+ name = name.substring(REST_PREFIX_LEN);
+ props.setProperty(name, value);
+ }
+ }
+ return props;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/876617bd/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/filter/GZIPRequestStream.java
----------------------------------------------------------------------
diff --git a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/filter/GZIPRequestStream.java b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/filter/GZIPRequestStream.java
new file mode 100644
index 0000000..02957e9
--- /dev/null
+++ b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/filter/GZIPRequestStream.java
@@ -0,0 +1,58 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.rest.filter;
+
+import java.io.IOException;
+import java.util.zip.GZIPInputStream;
+
+import javax.servlet.ServletInputStream;
+import javax.servlet.http.HttpServletRequest;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+@InterfaceAudience.Private
+public class GZIPRequestStream extends ServletInputStream
+{
+ private GZIPInputStream in;
+
+ public GZIPRequestStream(HttpServletRequest request) throws IOException {
+ this.in = new GZIPInputStream(request.getInputStream());
+ }
+
+ @Override
+ public int read() throws IOException {
+ return in.read();
+ }
+
+ @Override
+ public int read(byte[] b) throws IOException {
+ return in.read(b);
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ return in.read(b, off, len);
+ }
+
+ @Override
+ public void close() throws IOException {
+ in.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/876617bd/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/filter/GZIPRequestWrapper.java
----------------------------------------------------------------------
diff --git a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/filter/GZIPRequestWrapper.java b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/filter/GZIPRequestWrapper.java
new file mode 100644
index 0000000..361e442
--- /dev/null
+++ b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/filter/GZIPRequestWrapper.java
@@ -0,0 +1,52 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.rest.filter;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+
+import javax.servlet.ServletInputStream;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletRequestWrapper;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+@InterfaceAudience.Private
+public class GZIPRequestWrapper extends HttpServletRequestWrapper {
+ private ServletInputStream is;
+ private BufferedReader reader;
+
+ public GZIPRequestWrapper(HttpServletRequest request) throws IOException {
+ super(request);
+ this.is = new GZIPRequestStream(request);
+ this.reader = new BufferedReader(new InputStreamReader(this.is));
+ }
+
+ @Override
+ public ServletInputStream getInputStream() throws IOException {
+ return is;
+ }
+
+ @Override
+ public BufferedReader getReader() throws IOException {
+ return reader;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/876617bd/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/filter/GZIPResponseStream.java
----------------------------------------------------------------------
diff --git a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/filter/GZIPResponseStream.java b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/filter/GZIPResponseStream.java
new file mode 100644
index 0000000..cc74f9c
--- /dev/null
+++ b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/filter/GZIPResponseStream.java
@@ -0,0 +1,78 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.rest.filter;
+
+import java.io.IOException;
+import java.util.zip.GZIPOutputStream;
+
+import javax.servlet.ServletOutputStream;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+@InterfaceAudience.Private
+public class GZIPResponseStream extends ServletOutputStream
+{
+ private HttpServletResponse response;
+ private GZIPOutputStream out;
+
+ public GZIPResponseStream(HttpServletResponse response) throws IOException {
+ this.response = response;
+ this.out = new GZIPOutputStream(response.getOutputStream());
+ response.addHeader("Content-Encoding", "gzip");
+ }
+
+ public void resetBuffer() {
+ if (out != null && !response.isCommitted()) {
+ response.setHeader("Content-Encoding", null);
+ }
+ out = null;
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ out.write(b);
+ }
+
+ @Override
+ public void write(byte[] b) throws IOException {
+ out.write(b);
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ out.write(b, off, len);
+ }
+
+ @Override
+ public void close() throws IOException {
+ finish();
+ out.close();
+ }
+
+ @Override
+ public void flush() throws IOException {
+ out.flush();
+ }
+
+ public void finish() throws IOException {
+ out.finish();
+ }
+}