You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hawq.apache.org by sh...@apache.org on 2015/10/28 23:09:54 UTC
[04/15] incubator-hawq git commit: HAWQ-45. PXF Package Namespace
change
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-service/src/main/java/com/pivotal/pxf/service/io/Text.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/java/com/pivotal/pxf/service/io/Text.java b/pxf/pxf-service/src/main/java/com/pivotal/pxf/service/io/Text.java
deleted file mode 100644
index 2ce28ba..0000000
--- a/pxf/pxf-service/src/main/java/com/pivotal/pxf/service/io/Text.java
+++ /dev/null
@@ -1,379 +0,0 @@
-package com.pivotal.pxf.service.io;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.CharBuffer;
-import java.nio.charset.*;
-import java.util.Arrays;
-
-/**
- * This class stores text using standard UTF8 encoding. It provides methods to
- * serialize, deserialize. The type of length is integer and is serialized using
- * zero-compressed format.
- */
-public class Text implements Writable {
-
- // for write
- private byte[] buf;
- private static final Log LOG = LogFactory.getLog(Text.class);
- int curLoc;
- private static final char LINE_DELIMITER = '\n';
- private static final int BUF_SIZE = 1024;
- private static final int EOF = -1;
-
- private static final byte[] EMPTY_BYTES = new byte[0];
- private static ThreadLocal<CharsetEncoder> ENCODER_FACTORY = new ThreadLocal<CharsetEncoder>() {
- @Override
- protected CharsetEncoder initialValue() {
- return Charset.forName("UTF-8").newEncoder().onMalformedInput(
- CodingErrorAction.REPORT).onUnmappableCharacter(
- CodingErrorAction.REPORT);
- }
- };
- private static ThreadLocal<CharsetDecoder> DECODER_FACTORY = new ThreadLocal<CharsetDecoder>() {
- @Override
- protected CharsetDecoder initialValue() {
- return Charset.forName("UTF-8").newDecoder().onMalformedInput(
- CodingErrorAction.REPORT).onUnmappableCharacter(
- CodingErrorAction.REPORT);
- }
- };
- private byte[] bytes;
- private int length;
-
- public Text() {
- bytes = EMPTY_BYTES;
- buf = new byte[BUF_SIZE];
- }
-
- /**
- * Construct from a string.
- *
- * @param string input string
- */
- public Text(String string) {
- set(string);
- }
-
- /**
- * Construct from another text.
- *
- * @param utf8 text to copy
- */
- public Text(Text utf8) {
- set(utf8);
- }
-
- /**
- * Construct from a byte array.
- *
- * @param utf8 input byte array
- */
- public Text(byte[] utf8) {
- set(utf8);
- }
-
- public static boolean isNegativeVInt(byte value) {
- return value < -120 || (value >= -112 && value < 0);
- }
-
- public static long readVLong(DataInput stream) throws IOException {
- byte firstByte = stream.readByte();
- int len = decodeVIntSize(firstByte);
- if (len == 1) {
- return firstByte;
- }
- long i = 0;
- for (int idx = 0; idx < len - 1; idx++) {
- byte b = stream.readByte();
- i = i << 8;
- i = i | (b & 0xFF);
- }
- return (isNegativeVInt(firstByte) ? (i ^ -1L) : i);
- }
-
- public static int decodeVIntSize(byte value) {
- if (value >= -112) {
- return 1;
- } else if (value < -120) {
- return -119 - value;
- }
- return -111 - value;
- }
-
- public static String decode(byte[] utf8, int start, int length)
- throws CharacterCodingException {
- return decode(ByteBuffer.wrap(utf8, start, length), true);
- }
-
- /**
- * Converts the provided byte array to a String using the UTF-8 encoding. If
- * <code>replace</code> is true, then malformed input is replaced with the
- * substitution character, which is U+FFFD. Otherwise the method throws a
- * MalformedInputException.
- *
- * @param utf8 UTF-8 encoded byte array
- * @param start start point
- * @param length length of array
- * @param replace whether to replace malformed input with substitution
- * character
- * @return decoded string
- * @throws MalformedInputException if a malformed input is used
- * @throws CharacterCodingException if the conversion failed
- */
- public static String decode(byte[] utf8, int start, int length,
- boolean replace)
- throws CharacterCodingException {
- return decode(ByteBuffer.wrap(utf8, start, length), replace);
- }
-
- private static String decode(ByteBuffer utf8, boolean replace)
- throws CharacterCodingException {
- CharsetDecoder decoder = DECODER_FACTORY.get();
- if (replace) {
- decoder.onMalformedInput(java.nio.charset.CodingErrorAction.REPLACE);
- decoder.onUnmappableCharacter(CodingErrorAction.REPLACE);
- }
- String str = decoder.decode(utf8).toString();
- // set decoder back to its default value: REPORT
- if (replace) {
- decoder.onMalformedInput(CodingErrorAction.REPORT);
- decoder.onUnmappableCharacter(CodingErrorAction.REPORT);
- }
- return str;
- }
-
- /**
- * Converts the provided String to bytes using the UTF-8 encoding. If the
- * input is malformed, invalid chars are replaced by a default value.
- *
- * @param string string to encode
- * @return ByteBuffer: bytes stores at ByteBuffer.array() and length is
- * ByteBuffer.limit()
- * @throws CharacterCodingException if conversion failed
- */
- public static ByteBuffer encode(String string)
- throws CharacterCodingException {
- return encode(string, true);
- }
-
- /**
- * Converts the provided String to bytes using the UTF-8 encoding. If
- * <code>replace</code> is true, then malformed input is replaced with the
- * substitution character, which is U+FFFD. Otherwise the method throws a
- * MalformedInputException.
- *
- * @param string string to encode
- * @param replace whether to replace malformed input with substitution character
- * @return ByteBuffer: bytes stores at ByteBuffer.array() and length is
- * ByteBuffer.limit()
- * @throws MalformedInputException if a malformed input is used
- * @throws CharacterCodingException if the conversion failed
- */
- public static ByteBuffer encode(String string, boolean replace)
- throws CharacterCodingException {
- CharsetEncoder encoder = ENCODER_FACTORY.get();
- if (replace) {
- encoder.onMalformedInput(CodingErrorAction.REPLACE);
- encoder.onUnmappableCharacter(CodingErrorAction.REPLACE);
- }
- ByteBuffer bytes = encoder.encode(CharBuffer.wrap(string.toCharArray()));
- if (replace) {
- encoder.onMalformedInput(CodingErrorAction.REPORT);
- encoder.onUnmappableCharacter(CodingErrorAction.REPORT);
- }
- return bytes;
- }
-
- /**
- * Returns the raw bytes; however, only data up to {@link #getLength()} is
- * valid.
- *
- * @return raw bytes of byte array
- */
- public byte[] getBytes() {
- return bytes;
- }
-
- /**
- * Returns the number of bytes in the byte array
- *
- * @return number of bytes in byte array
- */
- public int getLength() {
- return length;
- }
-
- /**
- * Sets to contain the contents of a string.
- *
- * @param string input string
- */
- public void set(String string) {
- try {
- ByteBuffer bb = encode(string, true);
- bytes = bb.array();
- length = bb.limit();
- } catch (CharacterCodingException e) {
- throw new RuntimeException("Should not have happened "
- + e.toString());
- }
- }
-
- /**
- * Sets to a UTF-8 byte array.
- *
- * @param utf8 input UTF-8 byte array
- */
- public void set(byte[] utf8) {
- set(utf8, 0, utf8.length);
- }
-
- /**
- * Copies a text.
- *
- * @param other text object to copy.
- */
- public void set(Text other) {
- set(other.getBytes(), 0, other.getLength());
- }
-
- /**
- * Sets the Text to range of bytes.
- *
- * @param utf8 the data to copy from
- * @param start the first position of the new string
- * @param len the number of bytes of the new string
- */
- public void set(byte[] utf8, int start, int len) {
- setCapacity(len, false);
- System.arraycopy(utf8, start, bytes, 0, len);
- this.length = len;
- }
-
- /**
- * Appends a range of bytes to the end of the given text.
- *
- * @param utf8 the data to copy from
- * @param start the first position to append from utf8
- * @param len the number of bytes to append
- */
- public void append(byte[] utf8, int start, int len) {
- setCapacity(length + len, true);
- System.arraycopy(utf8, start, bytes, length, len);
- length += len;
- }
-
- /**
- * Clears the string to empty.
- */
- public void clear() {
- length = 0;
- }
-
- /*
- * Sets the capacity of this Text object to <em>at least</em>
- * <code>len</code> bytes. If the current buffer is longer, then the
- * capacity and existing content of the buffer are unchanged. If
- * <code>len</code> is larger than the current capacity, the Text object's
- * capacity is increased to match.
- *
- * @param len the number of bytes we need
- *
- * @param keepData should the old data be kept
- */
- private void setCapacity(int len, boolean keepData) {
- if (bytes == null || bytes.length < len) {
- byte[] newBytes = new byte[len];
- if (bytes != null && keepData) {
- System.arraycopy(bytes, 0, newBytes, 0, length);
- }
- bytes = newBytes;
- }
- }
-
- /**
- * Convert text back to string
- *
- * @see java.lang.Object#toString()
- */
- @Override
- public String toString() {
- try {
- return decode(bytes, 0, length);
- } catch (CharacterCodingException e) {
- throw new RuntimeException("Should not have happened "
- + e.toString());
- }
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- byte[] bytes = getBytes();
- out.write(bytes, 0, getLength());
- }
-
- /**
- * deserialize
- */
- @Override
- public void readFields(DataInput inputStream) throws IOException {
-
- byte c;
- curLoc = 0;
- clear();
- while ((c = (byte) ((DataInputStream) inputStream).read()) != EOF) {
- buf[curLoc] = c;
- curLoc++;
-
- if (c == LINE_DELIMITER) {
- LOG.trace("read one line, size " + curLoc);
- break;
- }
-
- if (isBufferFull()) {
- flushBuffer();
- }
- }
-
- if (!isBufferEmpty()) {
- // the buffer doesn't end with a line break.
- if (c == EOF) {
- LOG.warn("Stream ended without line break");
- }
- flushBuffer();
- }
- }
-
- private boolean isBufferEmpty() {
- return (curLoc == 0);
- }
-
- private boolean isBufferFull() {
- return (curLoc == BUF_SIZE);
- }
-
- private void flushBuffer() {
- append(buf, 0, curLoc);
- curLoc = 0;
- }
-
- /**
- * Returns true iff <code>o</code> is a Text with the same contents.
- */
- @Override
- public boolean equals(Object o) {
- return (o instanceof Text && Arrays.equals(bytes, ((Text) o).bytes));
- }
-
- @Override
- public int hashCode() {
- return Arrays.hashCode(bytes);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-service/src/main/java/com/pivotal/pxf/service/io/Writable.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/java/com/pivotal/pxf/service/io/Writable.java b/pxf/pxf-service/src/main/java/com/pivotal/pxf/service/io/Writable.java
deleted file mode 100644
index 3bb3d50..0000000
--- a/pxf/pxf-service/src/main/java/com/pivotal/pxf/service/io/Writable.java
+++ /dev/null
@@ -1,30 +0,0 @@
-package com.pivotal.pxf.service.io;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-/**
- * A serializable object which implements a simple, efficient, serialization
- * protocol, based on {@link DataInput} and {@link DataOutput}.
- */
-public interface Writable {
-
- /**
- * Serialize the fields of this object to <code>out</code>.
- *
- * @param out <code>DataOutput</code> to serialize this object into.
- * @throws IOException if I/O error occurs
- */
- void write(DataOutput out) throws IOException;
-
- /**
- * Deserialize the fields of this object from <code>in</code>.
- * <p>For efficiency, implementations should attempt to re-use storage in the
- * existing object where possible.</p>
- *
- * @param in <code>DataInput</code> to deserialize this object from.
- * @throws IOException if I/O error occurs
- */
- void readFields(DataInput in) throws IOException;
-}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-service/src/main/java/com/pivotal/pxf/service/rest/AnalyzerResource.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/java/com/pivotal/pxf/service/rest/AnalyzerResource.java b/pxf/pxf-service/src/main/java/com/pivotal/pxf/service/rest/AnalyzerResource.java
deleted file mode 100644
index 1531e7f..0000000
--- a/pxf/pxf-service/src/main/java/com/pivotal/pxf/service/rest/AnalyzerResource.java
+++ /dev/null
@@ -1,102 +0,0 @@
-package com.pivotal.pxf.service.rest;
-
-import com.pivotal.pxf.api.Analyzer;
-import com.pivotal.pxf.api.AnalyzerStats;
-import com.pivotal.pxf.service.AnalyzerFactory;
-import com.pivotal.pxf.service.utilities.ProtocolData;
-import com.pivotal.pxf.service.utilities.SecuredHDFS;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import javax.servlet.ServletContext;
-import javax.ws.rs.GET;
-import javax.ws.rs.Path;
-import javax.ws.rs.Produces;
-import javax.ws.rs.QueryParam;
-import javax.ws.rs.core.Context;
-import javax.ws.rs.core.HttpHeaders;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-import java.io.IOException;
-import java.util.Map;
-
-/*
- * Class enhances the API of the WEBHDFS REST server.
- * Returns the data fragments that a data resource is made of, enabling parallel processing of the data resource.
- * Example for querying API ANALYZER from a web client
- * curl -i "http://localhost:50070/pxf/v2/Analyzer/getEstimatedStats?path=/dir1/dir2/*txt"
- * /pxf/ is made part of the path when there is a webapp by that name in tcServer.
- */
-@Path("/" + Version.PXF_PROTOCOL_VERSION + "/Analyzer/")
-public class AnalyzerResource extends RestResource {
- private Log Log;
-
-
- public AnalyzerResource() throws IOException {
- Log = LogFactory.getLog(AnalyzerResource.class);
- }
-
- /*
- * Returns estimated statistics for the given path (data source).
- * Example for querying API ANALYZER from a web client
- * curl -i "http://localhost:50070/pxf/v2/Analyzer/getEstimatedStats?path=/dir1/dir2/*txt"
- * A default answer, unless an analyzer implements getEstimatedStats, would be:
- * {"PXFDataSourceStats":[{"blockSize":67108864,"numberOfBlocks":1000,"numberOfTuples":1000000}]}
- * Currently only HDFS is implemented to calculate the block size and block number,
- * and returns -1 for number of tuples.
- * Example:
- * {"PXFDataSourceStats":[{"blockSize":67108864,"numberOfBlocks":3,"numberOfTuples":-1}]}
- *
- * @param servletContext Servlet context contains attributes required by SecuredHDFS
- * @param headers Holds HTTP headers from request
- * @param path Holds URI path option used in this request
- */
- @GET
- @Path("getEstimatedStats")
- @Produces("application/json")
- public Response getEstimatedStats(@Context ServletContext servletContext,
- @Context final HttpHeaders headers,
- @QueryParam("path") String path) throws Exception {
-
- if (Log.isDebugEnabled()) {
- StringBuilder startmsg = new StringBuilder("ANALYZER/getEstimatedStats started for path \"" + path + "\"");
- if (headers != null) {
- for (String header : headers.getRequestHeaders().keySet()) {
- startmsg.append(" Header: ").append(header).append(" Value: ").append(headers.getRequestHeader(header));
- }
- }
- Log.debug(startmsg);
- }
-
- /* Convert headers into a regular map */
- Map<String, String> params = convertToCaseInsensitiveMap(headers.getRequestHeaders());
-
- /* Store protocol level properties and verify */
- final ProtocolData protData = new ProtocolData(params);
- SecuredHDFS.verifyToken(protData, servletContext);
-
- /*
- * Analyzer is a special case in which it is hard to tell if user didn't
- * specify one, or specified a profile that doesn't include one, or it's
- * an actual protocol violation. Since we can only test protocol level
- * logic, we assume (like before) that it's a user error, which is the
- * case in most likelihood. When analyzer module is removed in the near
- * future, this assumption will go away with it.
- */
- if (protData.getAnalyzer() == null) {
- throw new IllegalArgumentException(
- "PXF 'Analyzer' class was not found. Please supply it in the LOCATION clause or use it in a PXF profile in order to run ANALYZE on this table");
- }
-
- /* Create an analyzer instance with API level parameters */
- final Analyzer analyzer = AnalyzerFactory.create(protData);
-
- /*
- * Function queries the pxf Analyzer for the data fragments of the resource
- * The fragments are returned in a string formatted in JSON
- */
- String jsonOutput = AnalyzerStats.dataToJSON(analyzer.getEstimatedStats(path));
-
- return Response.ok(jsonOutput, MediaType.APPLICATION_JSON_TYPE).build();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-service/src/main/java/com/pivotal/pxf/service/rest/BridgeResource.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/java/com/pivotal/pxf/service/rest/BridgeResource.java b/pxf/pxf-service/src/main/java/com/pivotal/pxf/service/rest/BridgeResource.java
deleted file mode 100644
index 495a808..0000000
--- a/pxf/pxf-service/src/main/java/com/pivotal/pxf/service/rest/BridgeResource.java
+++ /dev/null
@@ -1,151 +0,0 @@
-package com.pivotal.pxf.service.rest;
-
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.Map;
-import java.util.concurrent.locks.ReentrantLock;
-
-import javax.servlet.ServletContext;
-import javax.ws.rs.GET;
-import javax.ws.rs.Path;
-import javax.ws.rs.Produces;
-import javax.ws.rs.WebApplicationException;
-import javax.ws.rs.core.Context;
-import javax.ws.rs.core.HttpHeaders;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-import javax.ws.rs.core.StreamingOutput;
-
-import org.apache.catalina.connector.ClientAbortException;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import com.pivotal.pxf.service.Bridge;
-import com.pivotal.pxf.service.ReadBridge;
-import com.pivotal.pxf.service.io.Writable;
-import com.pivotal.pxf.service.utilities.ProtocolData;
-import com.pivotal.pxf.service.utilities.SecuredHDFS;
-
-/*
- * This class handles the subpath /<version>/Bridge/ of this
- * REST component
- */
-@Path("/" + Version.PXF_PROTOCOL_VERSION + "/Bridge/")
-public class BridgeResource extends RestResource {
-
- private static Log Log = LogFactory.getLog(BridgeResource.class);
- /**
- * Lock is needed here in the case of a non-thread-safe plugin.
- * Using synchronized methods is not enough because the bridge work
- * is called by jetty ({@link StreamingOutput}), after we are getting
- * out of this class's context.
- * <p/>
- * BRIDGE_LOCK is accessed through lock() and unlock() functions, based on the
- * isThreadSafe parameter that is determined by the bridge.
- */
- private static final ReentrantLock BRIDGE_LOCK = new ReentrantLock();
-
- public BridgeResource() {
- }
-
- /*
- * Used to be HDFSReader. Creates a bridge instance and iterates over
- * its records, printing it out to outgoing stream.
- * Outputs GPDBWritable.
- *
- * Parameters come through HTTP header.
- *
- * @param servletContext Servlet context contains attributes required by SecuredHDFS
- * @param headers Holds HTTP headers from request
- */
- @GET
- @Produces(MediaType.APPLICATION_OCTET_STREAM)
- public Response read(@Context final ServletContext servletContext,
- @Context HttpHeaders headers) throws Exception {
- // Convert headers into a regular map
- Map<String, String> params = convertToCaseInsensitiveMap(headers.getRequestHeaders());
-
- Log.debug("started with parameters: " + params);
-
- ProtocolData protData = new ProtocolData(params);
- SecuredHDFS.verifyToken(protData, servletContext);
- Bridge bridge = new ReadBridge(protData);
- String dataDir = protData.getDataSource();
- // THREAD-SAFE parameter has precedence
- boolean isThreadSafe = protData.isThreadSafe() && bridge.isThreadSafe();
- Log.debug("Request for " + dataDir + " will be handled " +
- (isThreadSafe ? "without" : "with") + " synchronization");
-
- return readResponse(bridge, protData, isThreadSafe);
- }
-
- Response readResponse(final Bridge bridge, ProtocolData protData, final boolean threadSafe) throws Exception {
- final int fragment = protData.getDataFragment();
- final String dataDir = protData.getDataSource();
-
- // Creating an internal streaming class
- // which will iterate the records and put them on the
- // output stream
- final StreamingOutput streaming = new StreamingOutput() {
- @Override
- public void write(final OutputStream out) throws IOException, WebApplicationException {
- long recordCount = 0;
-
- if (!threadSafe) {
- lock(dataDir);
- }
- try {
-
- if (!bridge.beginIteration()) {
- return;
- }
-
- Writable record;
- DataOutputStream dos = new DataOutputStream(out);
- Log.debug("Starting streaming fragment " + fragment + " of resource " + dataDir);
- while ((record = bridge.getNext()) != null) {
- record.write(dos);
- ++recordCount;
- }
- Log.debug("Finished streaming fragment " + fragment + " of resource " + dataDir + ", " + recordCount + " records.");
- } catch (ClientAbortException e) {
- // Occurs whenever client (HAWQ) decides the end the connection
- Log.error("Remote connection closed by HAWQ", e);
- } catch (Exception e) {
- Log.error("Exception thrown when streaming", e);
- throw new IOException(e.getMessage());
- } finally {
- Log.debug("Stopped streaming fragment " + fragment + " of resource " + dataDir + ", " + recordCount + " records.");
- if (!threadSafe) {
- unlock(dataDir);
- }
- }
- }
- };
-
- return Response.ok(streaming, MediaType.APPLICATION_OCTET_STREAM).build();
- }
-
- /**
- * Lock BRIDGE_LOCK
- *
- * @param path path for the request, used for logging.
- */
- private void lock(String path) {
- Log.trace("Locking BridgeResource for " + path);
- BRIDGE_LOCK.lock();
- Log.trace("Locked BridgeResource for " + path);
- }
-
- /**
- * Unlock BRIDGE_LOCK
- *
- * @param path path for the request, used for logging.
- */
- private void unlock(String path) {
- Log.trace("Unlocking BridgeResource for " + path);
- BRIDGE_LOCK.unlock();
- Log.trace("Unlocked BridgeResource for " + path);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-service/src/main/java/com/pivotal/pxf/service/rest/ClusterNodesResource.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/java/com/pivotal/pxf/service/rest/ClusterNodesResource.java b/pxf/pxf-service/src/main/java/com/pivotal/pxf/service/rest/ClusterNodesResource.java
deleted file mode 100644
index 2fd2207..0000000
--- a/pxf/pxf-service/src/main/java/com/pivotal/pxf/service/rest/ClusterNodesResource.java
+++ /dev/null
@@ -1,117 +0,0 @@
-package com.pivotal.pxf.service.rest;
-
-import org.apache.catalina.connector.ClientAbortException;
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
-
-import javax.ws.rs.GET;
-import javax.ws.rs.Path;
-import javax.ws.rs.Produces;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-import java.io.IOException;
-
-/*
- * Class enhances the API of the HBASE rest server.
- * Example for querying API getClusterNodesInfo from a web client
- * curl "http://localhost:50070/pxf/v2/HadoopCluster/getNodesInfo"
- * /pxf/ is made part of the path when there is a webapp by that name in tcServer.
- */
-@Path("/" + Version.PXF_PROTOCOL_VERSION + "/HadoopCluster/")
-public class ClusterNodesResource {
- private Log Log;
-
- public ClusterNodesResource() throws IOException {
- Log = LogFactory.getLog(ClusterNodesResource.class);
- }
-
-
- /*
- * Function queries the Hadoop namenode with the getDataNodeStats API
- * It gets the host's IP and REST port of every HDFS data node in the
- * cluster. Then, it packs the results in JSON format and writes to the
- * HTTP response stream.
- * Response Examples:
- * a. When there are no datanodes - getDataNodeStats returns an empty array
- * {"regions":[]}
- * b. When there are datanodes
- * {"regions":[{"host":"1.2.3.1","port":50075},{"host":"1.2.3.2","port":50075}]}
- */
- @GET
- @Path("getNodesInfo")
- @Produces("application/json")
- public Response read() throws Exception {
- Log.debug("getNodesInfo started");
- StringBuilder jsonOutput = new StringBuilder("{\"regions\":[");
- try {
- /* 1. Initialize the HADOOP client side API for a distributed file system */
- Configuration conf = new Configuration();
- FileSystem fs = FileSystem.get(conf);
- DistributedFileSystem dfs = (DistributedFileSystem) fs;
-
- /* 2. Query the namenode for the datanodes info.
- * Only live nodes are returned - in accordance with the results returned by
- * org.apache.hadoop.hdfs.tools.DFSAdmin#report().
- */
- DatanodeInfo[] liveNodes = dfs.getDataNodeStats(DatanodeReportType.LIVE);
-
- /* 3. Pack the datanodes info in a JSON text format and write it
- * to the HTTP output stream.
- */
- String prefix = "";
- for (DatanodeInfo node : liveNodes) {
- verifyNode(node);
- jsonOutput.append(prefix).append(writeNode(node)); // write one node to the HTTP stream
- prefix = ",";
- }
- jsonOutput.append("]}");
- Log.debug("getNodesCluster output: " + jsonOutput);
- } catch (NodeDataException e) {
- Log.error("Nodes verification failed", e);
- throw e;
- } catch (ClientAbortException e) {
- Log.error("Remote connection closed by HAWQ", e);
- throw e;
- } catch (java.io.IOException e) {
- Log.error("Unhandled exception thrown", e);
- throw e;
- }
-
- return Response.ok(jsonOutput.toString(), MediaType.APPLICATION_JSON_TYPE).build();
- }
-
- private class NodeDataException extends java.io.IOException {
-
- /**
- *
- */
- private static final long serialVersionUID = 1L;
-
- public NodeDataException(String paramString) {
- super(paramString);
- }
- }
-
- private void verifyNode(DatanodeInfo node) throws NodeDataException {
- int port = node.getInfoPort();
- String ip = node.getIpAddr();
-
- if (StringUtils.isEmpty(ip)) {
- throw new NodeDataException("Invalid IP: " + ip + " (Node " + node + ")");
- }
-
- if (port <= 0) {
- throw new NodeDataException("Invalid port: " + port + " (Node " + node + ")");
- }
- }
-
- String writeNode(DatanodeInfo node) throws java.io.IOException {
- return "{\"host\":\"" + node.getIpAddr() + "\",\"port\":" + node.getInfoPort() + "}";
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-service/src/main/java/com/pivotal/pxf/service/rest/FragmenterResource.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/java/com/pivotal/pxf/service/rest/FragmenterResource.java b/pxf/pxf-service/src/main/java/com/pivotal/pxf/service/rest/FragmenterResource.java
deleted file mode 100644
index 650414f..0000000
--- a/pxf/pxf-service/src/main/java/com/pivotal/pxf/service/rest/FragmenterResource.java
+++ /dev/null
@@ -1,84 +0,0 @@
-package com.pivotal.pxf.service.rest;
-
-import com.pivotal.pxf.api.Fragment;
-import com.pivotal.pxf.api.Fragmenter;
-import com.pivotal.pxf.service.FragmenterFactory;
-import com.pivotal.pxf.service.FragmentsResponse;
-import com.pivotal.pxf.service.FragmentsResponseFormatter;
-import com.pivotal.pxf.service.utilities.ProtocolData;
-import com.pivotal.pxf.service.utilities.SecuredHDFS;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import javax.servlet.ServletContext;
-import javax.ws.rs.GET;
-import javax.ws.rs.Path;
-import javax.ws.rs.Produces;
-import javax.ws.rs.QueryParam;
-import javax.ws.rs.core.Context;
-import javax.ws.rs.core.HttpHeaders;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
-/*
- * Class enhances the API of the WEBHDFS REST server.
- * Returns the data fragments that a data resource is made of, enabling parallel processing of the data resource.
- * Example for querying API FRAGMENTER from a web client
- * curl -i "http://localhost:50070/pxf/v2/Fragmenter/getFragments?path=/dir1/dir2/*txt"
- * /pxf/ is made part of the path when there is a webapp by that name in tcServer.
- */
-@Path("/" + Version.PXF_PROTOCOL_VERSION + "/Fragmenter/")
-public class FragmenterResource extends RestResource {
- private Log Log;
-
- public FragmenterResource() throws IOException {
- Log = LogFactory.getLog(FragmenterResource.class);
- }
-
- /*
- * The function is called when http://nn:port/pxf/vx/Fragmenter/getFragments?path=...
- * is used
- *
- * @param servletContext Servlet context contains attributes required by SecuredHDFS
- * @param headers Holds HTTP headers from request
- * @param path Holds URI path option used in this request
- */
- @GET
- @Path("getFragments")
- @Produces("application/json")
- public Response getFragments(@Context final ServletContext servletContext,
- @Context final HttpHeaders headers,
- @QueryParam("path") final String path) throws Exception {
-
- if (Log.isDebugEnabled()) {
- StringBuilder startMsg = new StringBuilder("FRAGMENTER started for path \"" + path + "\"");
- for (String header : headers.getRequestHeaders().keySet()) {
- startMsg.append(" Header: ").append(header).append(" Value: ").append(headers.getRequestHeader(header));
- }
- Log.debug(startMsg);
- }
-
- /* Convert headers into a case-insensitive regular map */
- Map<String, String> params = convertToCaseInsensitiveMap(headers.getRequestHeaders());
-
- /* Store protocol level properties and verify */
- ProtocolData protData = new ProtocolData(params);
- if (protData.getFragmenter() == null) {
- protData.protocolViolation("fragmenter");
- }
- SecuredHDFS.verifyToken(protData, servletContext);
-
- /* Create a fragmenter instance with API level parameters */
- final Fragmenter fragmenter = FragmenterFactory.create(protData);
-
- List<Fragment> fragments = fragmenter.getFragments();
- FragmentsResponse fragmentsResponse = FragmentsResponseFormatter.formatResponse(fragments, path);
-
- return Response.ok(fragmentsResponse, MediaType.APPLICATION_JSON_TYPE).build();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-service/src/main/java/com/pivotal/pxf/service/rest/InvalidPathResource.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/java/com/pivotal/pxf/service/rest/InvalidPathResource.java b/pxf/pxf-service/src/main/java/com/pivotal/pxf/service/rest/InvalidPathResource.java
deleted file mode 100644
index 8a32b01..0000000
--- a/pxf/pxf-service/src/main/java/com/pivotal/pxf/service/rest/InvalidPathResource.java
+++ /dev/null
@@ -1,129 +0,0 @@
-package com.pivotal.pxf.service.rest;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import javax.ws.rs.GET;
-import javax.ws.rs.POST;
-import javax.ws.rs.Path;
-import javax.ws.rs.PathParam;
-import javax.ws.rs.core.Context;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-import javax.ws.rs.core.Response.ResponseBuilder;
-import javax.ws.rs.core.UriInfo;
-import java.io.IOException;
-
-
-class Version {
- final static String PXF_PROTOCOL_VERSION = "v13";
-}
-
-/**
- * Class for catching paths that are not defined by other resources.
- * NOTE: This resource must be accessible without any security checks
- * as it is used to verify proper load of the PXF webapp.
- *
- * For each path, the version is compared to the current version PXF_VERSION.
- * The expected format of a path is "{@code http://<host>:<port>/pxf/<version>/<rest of path>}"
- *
- * The returned value is always a Server Error code (500).
- * If the version is different than the current version, an appropriate error is returned with version details.
- * Otherwise, an error about unknown path is returned.
- */
-@Path("/")
-public class InvalidPathResource {
- @Context
- UriInfo rootUri;
-
- private Log Log;
-
- public InvalidPathResource() throws IOException {
- super();
- Log = LogFactory.getLog(InvalidPathResource.class);
- }
-
- /*
- * Catch path /pxf/
- */
- @GET
- @Path("/")
- public Response noPathGet() throws Exception {
- return noPath();
- }
-
- @POST
- @Path("/")
- public Response noPathPost() throws Exception {
- return noPath();
- }
-
- private Response noPath() throws Exception {
- String errmsg = "Unknown path " + rootUri.getAbsolutePath();
- return sendErrorMessage(errmsg);
- }
-
- /*
- * Catch paths of pattern /pxf/*
- */
- @GET
- @Path("/{path:.*}")
- public Response wrongPathGet(@PathParam("path") String path) throws Exception {
- return wrongPath(path);
- }
-
- /*
- * Catch paths of pattern /pxf/*
- */
- @POST
- @Path("/{path:.*}")
- public Response wrongPathPost(@PathParam("path") String path) throws Exception {
- return wrongPath(path);
- }
-
-
- private Response wrongPath(String path) throws Exception {
-
- String errmsg;
- String version = parseVersion(path);
-
- Log.debug("REST request: " + rootUri.getAbsolutePath() + ". " +
- "Version " + version + ", supported version is " + Version.PXF_PROTOCOL_VERSION);
-
- if (version.equals(Version.PXF_PROTOCOL_VERSION)) {
- errmsg = "Unknown path " + rootUri.getAbsolutePath();
- } else {
- errmsg = "Wrong version " + version + ", supported version is " + Version.PXF_PROTOCOL_VERSION;
- }
-
- return sendErrorMessage(errmsg);
- }
-
- /*
- * Return error message
- */
- private Response sendErrorMessage(String message) {
- ResponseBuilder b = Response.serverError();
- b.entity(message);
- b.type(MediaType.TEXT_PLAIN_TYPE);
- return b.build();
- }
-
- /*
- * Parse the version part from the path.
- * The the absolute path is
- * http://<host>:<port>/pxf/<version>/<rest of path>
- *
- * path - the path part after /pxf/
- * returns the first element after /pxf/
- */
- private String parseVersion(String path) {
-
- int slash = path.indexOf('/');
- if (slash == -1) {
- return path;
- }
-
- return path.substring(0, slash);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-service/src/main/java/com/pivotal/pxf/service/rest/MetadataResource.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/java/com/pivotal/pxf/service/rest/MetadataResource.java b/pxf/pxf-service/src/main/java/com/pivotal/pxf/service/rest/MetadataResource.java
deleted file mode 100644
index 0e5252f..0000000
--- a/pxf/pxf-service/src/main/java/com/pivotal/pxf/service/rest/MetadataResource.java
+++ /dev/null
@@ -1,91 +0,0 @@
-package com.pivotal.pxf.service.rest;
-
-import java.io.IOException;
-
-import javax.servlet.ServletContext;
-import javax.ws.rs.GET;
-import javax.ws.rs.Path;
-import javax.ws.rs.Produces;
-import javax.ws.rs.QueryParam;
-import javax.ws.rs.core.Context;
-import javax.ws.rs.core.HttpHeaders;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-
-import org.apache.catalina.connector.ClientAbortException;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import com.pivotal.pxf.api.Metadata;
-import com.pivotal.pxf.api.MetadataFetcher;
-import com.pivotal.pxf.service.MetadataFetcherFactory;
-import com.pivotal.pxf.service.MetadataResponseFormatter;
-
-/**
- * Class enhances the API of the WEBHDFS REST server.
- * Returns the metadata of a given hcatalog table.
- * <br>
- * Example for querying API FRAGMENTER from a web client:<br>
- * <code>curl -i "http://localhost:51200/pxf/v13/Metadata/getTableMetadata?table=t1"</code><br>
- * /pxf/ is made part of the path when there is a webapp by that name in tomcat.
- */
-@Path("/" + Version.PXF_PROTOCOL_VERSION + "/Metadata/")
-public class MetadataResource extends RestResource {
- private Log Log;
-
- public MetadataResource() throws IOException {
- Log = LogFactory.getLog(MetadataResource.class);
- }
-
- /**
- * This function queries the HiveMetaStore to get the given table's metadata:
- * Table name, field names, field types.
- * The types are converted from HCatalog types to HAWQ types.
- * Supported HCatalog types:
- * TINYINT, SMALLINT, INT, BIGINT, BOOLEAN, FLOAT, DOUBLE,
- * STRING, BINARY, TIMESTAMP, DATE, DECIMAL, VARCHAR, CHAR.
- * <br>
- * Unsupported types result in an error.
- * <br>
- * Response Examples:<br>
- * For a table <code>default.t1</code> with 2 fields (a int, b float) will be returned as:
- * <code>{"PXFMetadata":[{"table":{"dbName":"default","tableName":"t1"},"fields":[{"name":"a","type":"int"},{"name":"b","type":"float"}]}]}</code>
- *
- * @param servletContext servlet context
- * @param headers http headers
- * @param table HCatalog table name
- * @return JSON formatted response with metadata for given table
- * @throws Exception if connection to Hcatalog failed, table didn't exist or its type or fields are not supported
- */
- @GET
- @Path("getTableMetadata")
- @Produces("application/json")
- public Response read(@Context final ServletContext servletContext,
- @Context final HttpHeaders headers,
- @QueryParam("table") final String table) throws Exception {
- Log.debug("getTableMetadata started");
- String jsonOutput;
- try {
- // 1. start MetadataFetcher
- MetadataFetcher metadataFetcher =
- MetadataFetcherFactory.create("com.pivotal.pxf.plugins.hive.HiveMetadataFetcher"); //TODO: nhorn - 09-03-15 - pass as param
-
- // 2. get Metadata
- Metadata metadata = metadataFetcher.getTableMetadata(table);
-
- // 3. serialize to JSON
- jsonOutput = MetadataResponseFormatter.formatResponseString(metadata);
-
- Log.debug("getTableMetadata output: " + jsonOutput);
-
- } catch (ClientAbortException e) {
- Log.error("Remote connection closed by HAWQ", e);
- throw e;
- } catch (java.io.IOException e) {
- Log.error("Unhandled exception thrown", e);
- throw e;
- }
-
- return Response.ok(jsonOutput, MediaType.APPLICATION_JSON_TYPE).build();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-service/src/main/java/com/pivotal/pxf/service/rest/RestResource.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/java/com/pivotal/pxf/service/rest/RestResource.java b/pxf/pxf-service/src/main/java/com/pivotal/pxf/service/rest/RestResource.java
deleted file mode 100644
index a77c967..0000000
--- a/pxf/pxf-service/src/main/java/com/pivotal/pxf/service/rest/RestResource.java
+++ /dev/null
@@ -1,48 +0,0 @@
-package com.pivotal.pxf.service.rest;
-
-import javax.ws.rs.core.MultivaluedMap;
-
-import org.apache.commons.codec.CharEncoding;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import java.io.UnsupportedEncodingException;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-
-/**
- * Super of all PXF REST classes
- */
-public abstract class RestResource {
-
- private static Log Log = LogFactory.getLog(RestResource.class);
-
- /**
- * Converts the request headers multivalued map to a case-insensitive regular map
- * by taking only first values and storing them in a CASE_INSENSITIVE_ORDER TreeMap.
- * All values are converted from ISO_8859_1 (ISO-LATIN-1) to UTF_8.
- *
- * @param requestHeaders request headers multi map.
- * @return a regular case-insensitive map.
- * @throws UnsupportedEncodingException if the named charsets ISO_8859_1 and UTF_8 are not supported
- */
- public Map<String, String> convertToCaseInsensitiveMap(MultivaluedMap<String, String> requestHeaders)
- throws UnsupportedEncodingException {
- Map<String, String> result = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
- for (Map.Entry<String, List<String>> entry : requestHeaders.entrySet()) {
- String key = entry.getKey();
- List<String> values = entry.getValue();
- if (values != null) {
- String value = values.get(0);
- if (value != null) {
- // converting to value UTF-8 encoding
- value = new String(value.getBytes(CharEncoding.ISO_8859_1), CharEncoding.UTF_8);
- Log.trace("key: " + key + ". value: " + value);
- result.put(key, value.replace("\\\"", "\""));
- }
- }
- }
- return result;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-service/src/main/java/com/pivotal/pxf/service/rest/ServletLifecycleListener.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/java/com/pivotal/pxf/service/rest/ServletLifecycleListener.java b/pxf/pxf-service/src/main/java/com/pivotal/pxf/service/rest/ServletLifecycleListener.java
deleted file mode 100644
index e8cde86..0000000
--- a/pxf/pxf-service/src/main/java/com/pivotal/pxf/service/rest/ServletLifecycleListener.java
+++ /dev/null
@@ -1,43 +0,0 @@
-package com.pivotal.pxf.service.rest;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import javax.servlet.ServletContextListener;
-import javax.servlet.ServletContextEvent;
-
-import com.pivotal.pxf.service.utilities.Log4jConfigure;
-import com.pivotal.pxf.service.utilities.SecureLogin;
-
-/**
- * Listener on lifecycle events of our webapp
- */
-public class ServletLifecycleListener implements ServletContextListener {
-
- private static Log LOG = LogFactory.getLog(ServletContextListener.class);
-
- /**
- * Called after the webapp has been initialized.
- *
- * 1. Initializes log4j.
- * 2. Initiates a Kerberos login when Hadoop security is on.
- */
- @Override
- public void contextInitialized(ServletContextEvent event) {
- // 1. Initialize log4j:
- Log4jConfigure.configure(event);
-
- LOG.info("webapp initialized");
-
- // 2. Initiate secure login
- SecureLogin.login();
- }
-
- /**
- * Called before the webapp is about to go down
- */
- @Override
- public void contextDestroyed(ServletContextEvent event) {
- LOG.info("webapp about to go down");
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-service/src/main/java/com/pivotal/pxf/service/rest/WritableResource.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/java/com/pivotal/pxf/service/rest/WritableResource.java b/pxf/pxf-service/src/main/java/com/pivotal/pxf/service/rest/WritableResource.java
deleted file mode 100644
index 317620d..0000000
--- a/pxf/pxf-service/src/main/java/com/pivotal/pxf/service/rest/WritableResource.java
+++ /dev/null
@@ -1,151 +0,0 @@
-package com.pivotal.pxf.service.rest;
-
-import java.io.DataInputStream;
-import java.io.InputStream;
-import java.util.Map;
-
-import javax.servlet.ServletContext;
-import javax.ws.rs.Consumes;
-import javax.ws.rs.POST;
-import javax.ws.rs.Path;
-import javax.ws.rs.QueryParam;
-import javax.ws.rs.core.Context;
-import javax.ws.rs.core.HttpHeaders;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-
-import org.apache.catalina.connector.ClientAbortException;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import com.pivotal.pxf.service.Bridge;
-import com.pivotal.pxf.service.WriteBridge;
-import com.pivotal.pxf.service.utilities.ProtocolData;
-import com.pivotal.pxf.service.utilities.SecuredHDFS;
-
-/*
- * Running this resource manually:
- *
- * run:
- curl -i -X post "http://localhost:50070/pxf/v5w/Writable/stream?path=/data/curl/curl`date \"+%h%d_%H%M%s\"`" \
- --header "X-GP-Accessor: TextFileWAccessor" \
- --header "X-GP-Resolver: TextWResolver" \
- --header "Content-Type:application/octet-stream" \
- --header "Expect: 100-continue" \
- --header "X-GP-ALIGNMENT: 4" \
- --header "X-GP-SEGMENT-ID: 0" \
- --header "X-GP-SEGMENT-COUNT: 3" \
- --header "X-GP-HAS-FILTER: 0" \
- --header "X-GP-FORMAT: TEXT" \
- --header "X-GP-URI: pxf://localhost:50070/data/curl/?Accessor=TextFileWAccessor&Resolver=TextWResolver" \
- --header "X-GP-URL-HOST: localhost" \
- --header "X-GP-URL-PORT: 50070" \
- --header "X-GP-ATTRS: 0" \
- --header "X-GP-DATA-DIR: data/curl/" \
- -d "data111" -d "data222"
-
- * result:
-
- HTTP/1.1 200 OK
- Content-Type: text/plain;charset=UTF-8
- Content-Type: text/plain
- Transfer-Encoding: chunked
- Server: Jetty(7.6.10.v20130312)
-
- wrote 15 bytes to curlAug11_17271376231245
-
- file content:
- bin/hdfs dfs -cat /data/curl/*45
- data111&data222
-
- */
-
-
-/*
- * This class handles the subpath /<version>/Writable/ of this
- * REST component
- */
-@Path("/" + Version.PXF_PROTOCOL_VERSION + "/Writable/")
-public class WritableResource extends RestResource{
- private static final Log LOG = LogFactory.getLog(WritableResource.class);
-
- public WritableResource() {
- }
-
- /*
- * This function is called when http://nn:port/pxf/vx/Writable/stream?path=...
- * is used.
- *
- * @param servletContext Servlet context contains attributes required by SecuredHDFS
- * @param headers Holds HTTP headers from request
- * @param path Holds URI path option used in this request
- * @param inputStream stream of bytes to write from Hawq
- */
- @POST
- @Path("stream")
- @Consumes(MediaType.APPLICATION_OCTET_STREAM)
- public Response stream(@Context final ServletContext servletContext,
- @Context HttpHeaders headers,
- @QueryParam("path") String path,
- InputStream inputStream) throws Exception {
-
- /* Convert headers into a case-insensitive regular map */
- Map<String, String> params = convertToCaseInsensitiveMap(headers.getRequestHeaders());
- if (LOG.isDebugEnabled()) {
- LOG.debug("WritableResource started with parameters: " + params + " and write path: " + path);
- }
-
- ProtocolData protData = new ProtocolData(params);
- protData.setDataSource(path);
-
- SecuredHDFS.verifyToken(protData, servletContext);
- Bridge bridge = new WriteBridge(protData);
-
- // THREAD-SAFE parameter has precedence
- boolean isThreadSafe = protData.isThreadSafe() && bridge.isThreadSafe();
- LOG.debug("Request for " + path + " handled " +
- (isThreadSafe ? "without" : "with") + " synchronization");
-
- return isThreadSafe ?
- writeResponse(bridge, path, inputStream) :
- synchronizedWriteResponse(bridge, path, inputStream);
- }
-
- private static synchronized Response synchronizedWriteResponse(Bridge bridge,
- String path,
- InputStream inputStream)
- throws Exception {
- return writeResponse(bridge, path, inputStream);
- }
-
- private static Response writeResponse(Bridge bridge,
- String path,
- InputStream inputStream) throws Exception {
-
- String returnMsg;
-
- // Open the output file
- bridge.beginIteration();
-
- DataInputStream dataStream = new DataInputStream(inputStream);
-
- long totalWritten = 0;
-
- try {
- while (bridge.setNext(dataStream)) {
- ++totalWritten;
- }
- } catch (ClientAbortException e) {
- LOG.debug("Remote connection closed by HAWQ", e);
- } catch (Exception ex) {
- LOG.debug("totalWritten so far " + totalWritten + " to " + path);
- throw ex;
- } finally {
- inputStream.close();
- }
- returnMsg = "wrote " + totalWritten + " bulks to " + path;
- LOG.debug(returnMsg);
-
- return Response.ok(returnMsg).build();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-service/src/main/java/com/pivotal/pxf/service/utilities/CustomWebappLoader.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/java/com/pivotal/pxf/service/utilities/CustomWebappLoader.java b/pxf/pxf-service/src/main/java/com/pivotal/pxf/service/utilities/CustomWebappLoader.java
deleted file mode 100644
index cc78026..0000000
--- a/pxf/pxf-service/src/main/java/com/pivotal/pxf/service/utilities/CustomWebappLoader.java
+++ /dev/null
@@ -1,211 +0,0 @@
-package com.pivotal.pxf.service.utilities;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.net.URI;
-import java.nio.file.DirectoryStream;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-
-import org.apache.catalina.LifecycleException;
-import org.apache.catalina.loader.WebappLoader;
-import org.apache.juli.logging.Log;
-import org.apache.juli.logging.LogFactory;
-
-/**
- * A WebappLoader that allows a customized classpath to be added through configuration in context xml.
- * Any additional classpath entry will be added to the default webapp classpath.
- *
- * <pre>
- * <Context>
- * <Loader className="com.pivotal.pxf.service.utilities.CustomWebappLoader"
- * classpathFiles="/somedir/classpathFile1;/somedir/classpathFile2"/>
- * </Context>
- * </pre>
- */
-public class CustomWebappLoader extends WebappLoader {
-
- /**
- * Because this class belongs in tcServer itself, logs go into tcServer's log facility that is separate
- * from the web app's log facility.
- *
- * Logs are directed to catalina.log file. By default only INFO or higher messages are logged.
- * To change log level, add the following line to {catalina.base}/conf/logging.properties
- * <code>com.pivotal.pxf.level = FINE/INFO/WARNING</code> (FINE = debug).
- */
- private static final Log LOG = LogFactory.getLog(CustomWebappLoader.class);
-
- /**
- * Classpath files containing path entries, separated by new line.
- * Globbing is supported for the file name.
- * e.g:
- * somedir
- * anotherdir/somejar.jar
- * anotherone/hadoop*.jar
- * anotherone/pxf*[0-9].jar
- * Unix wildcard convention can be used to match a number of files
- * (e.g. <code>*</code>, <code>[0-9]</code>, <code>?</code>), but not a number of directories.
- *
- * The files specified under classpathFiles must exist - if they can't be read an exception will be thrown.
- */
- private String classpathFiles;
- /**
- * Secondary classpath files - if these files are unavailable only a warning will be logged.
- */
- private String secondaryClasspathFiles;
-
- /**
- * Constructs a WebappLoader with no defined parent class loader (actual parent will be the system class loader).
- */
- public CustomWebappLoader() {
- super();
- }
-
- /**
- * Constructs a WebappLoader with the specified class loader to be defined as the parent for this ClassLoader.
- *
- * @param parent The parent class loader
- */
- public CustomWebappLoader(ClassLoader parent) {
- super(parent);
- }
-
- /**
- * <code>classpathFiles</code> attribute is automatically set from the context xml file.
- *
- * @param classpathFiles Files separated by <code>;</code> Which contains <code>;</code> separated list of path entries.
- */
- public void setClasspathFiles(String classpathFiles) {
- this.classpathFiles = classpathFiles;
- }
-
- /**
- * <code>secondaryClasspathFiles</code> attribute is automatically set from the context xml file.
- *
- * @param secondaryClasspathFiles Files separated by <code>;</code> Which contains <code>;</code> separated list of path entries.
- */
- public void setSecondaryClasspathFiles(String secondaryClasspathFiles) {
- this.secondaryClasspathFiles = secondaryClasspathFiles;
- }
-
- /**
- * Implements {@link org.apache.catalina.util.LifecycleBase#startInternal()}.
- *
- * @throws LifecycleException if this component detects a fatal error that prevents this component from being used.
- */
- @Override
- protected void startInternal() throws LifecycleException {
-
- addRepositories(classpathFiles, true);
- addRepositories(secondaryClasspathFiles, false);
-
- super.startInternal();
- }
-
- private void addRepositories(String classpathFiles, boolean throwException) throws LifecycleException {
-
- for (String classpathFile : classpathFiles.split(";")) {
-
- String classpath = readClasspathFile(classpathFile, throwException);
- if (classpath == null) {
- continue;
- }
-
- ArrayList<String> classpathEntries = trimEntries(classpath.split("[\\r\\n]+"));
- LOG.info("Classpath file " + classpathFile + " has " + classpathEntries.size() + " entries");
-
- for (String entry : classpathEntries) {
- LOG.debug("Trying to load entry " + entry);
- int repositoriesCount = 0;
- Path pathEntry = Paths.get(entry);
- /*
- * For each entry, we look at the parent directory and try to match each of the files
- * inside it to the file name or pattern in the file name (the last part of the path).
- * E.g., for path '/some/path/with/pattern*', the parent directory will be '/some/path/with/'
- * and the file name will be 'pattern*'. Each file under that directory matching
- * this pattern will be added to the class loader repository.
- */
- try (DirectoryStream<Path> repositories = Files.newDirectoryStream(pathEntry.getParent(),
- pathEntry.getFileName().toString())) {
- for (Path repository : repositories) {
- if (addPathToRepository(repository, entry)) {
- repositoriesCount++;
- }
- }
- } catch (IOException e) {
- LOG.warn("Failed to load entry " + entry + ": " + e);
- }
- if (repositoriesCount == 0) {
- LOG.warn("Entry " + entry + " doesn't match any files");
- }
- LOG.debug("Loaded " + repositoriesCount + " repositories from entry " + entry);
- }
- }
- }
-
- private String readClasspathFile(String classpathFile, boolean throwException) throws LifecycleException {
- String classpath = null;
- try {
- LOG.info("Trying to read classpath file " + classpathFile);
- classpath = new String(Files.readAllBytes(Paths.get(classpathFile)));
- } catch (IOException ioe) {
- LOG.warn("Failed to read classpath file: " + ioe);
- if (throwException) {
- throw new LifecycleException("Failed to read classpath file: " + ioe, ioe);
- }
- }
- return classpath;
- }
-
- /**
- * Returns a list of valid classpath entries, excluding null, empty and comment lines.
- * @param classpathEntries original entries
- * @return valid entries
- */
- private ArrayList<String> trimEntries(String[] classpathEntries) {
-
- ArrayList<String> trimmed = new ArrayList<String>();
- int line = 0;
- for (String entry : classpathEntries) {
-
- line++;
- if (entry == null) {
- LOG.debug("Skipping entry #" + line + " (null)");
- continue;
- }
-
- entry = entry.trim();
- if (entry.isEmpty() || entry.startsWith("#")) {
- LOG.debug("Skipping entry #" + line + " (" + entry + ")");
- continue;
- }
- trimmed.add(entry);
- }
- return trimmed;
- }
-
- private boolean addPathToRepository(Path path, String entry) {
-
- try {
- URI pathUri = path.toUri();
- String pathUriStr = pathUri.toString();
- File file = new File(pathUri);
- if (!file.canRead()) {
- throw new FileNotFoundException(pathUriStr + " cannot be read");
- }
- addRepository(pathUriStr);
- LOG.debug("Repository " + pathUriStr + " added from entry " + entry);
- return true;
- } catch (Exception e) {
- LOG.warn("Failed to load path " + path + " to repository: " + e);
- }
-
- return false;
- }
-
-}
-
-
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-service/src/main/java/com/pivotal/pxf/service/utilities/Log4jConfigure.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/java/com/pivotal/pxf/service/utilities/Log4jConfigure.java b/pxf/pxf-service/src/main/java/com/pivotal/pxf/service/utilities/Log4jConfigure.java
deleted file mode 100644
index 47af63c..0000000
--- a/pxf/pxf-service/src/main/java/com/pivotal/pxf/service/utilities/Log4jConfigure.java
+++ /dev/null
@@ -1,45 +0,0 @@
-package com.pivotal.pxf.service.utilities;
-
-import java.io.File;
-
-import javax.servlet.ServletContext;
-import javax.servlet.ServletContextEvent;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.log4j.PropertyConfigurator;
-
-public class Log4jConfigure {
-
- private static Log LOG = LogFactory.getLog(Log4jConfigure.class);
-
- /**
- * Initializes log4j logging for the webapp.
- *
- * Reads log4j properties file location from log4jConfigLocation parameter in web.xml.
- * When not using aboslute path, the path starts from the webapp root directory.
- * If the file can't be read, reverts to default configuration file under
- * WEB-INF/classes/pxf-log4j.properties.
- *
- * @param event Servlet context, used to determine webapp root directory.
- */
- public static void configure(ServletContextEvent event) {
-
- final String defaultLog4jLocation = "WEB-INF/classes/pxf-log4j.properties";
-
- ServletContext context = event.getServletContext();
- String log4jConfigLocation = context.getInitParameter("log4jConfigLocation");
-
- if (!log4jConfigLocation.startsWith(File.separator)) {
- log4jConfigLocation = context.getRealPath("") + File.separator + log4jConfigLocation;
- }
-
- // revert to default properties file if file doesn't exist
- File log4jConfigFile = new File (log4jConfigLocation);
- if (!log4jConfigFile.canRead()) {
- log4jConfigLocation = context.getRealPath("") + File.separator + defaultLog4jLocation;
- }
- PropertyConfigurator.configure(log4jConfigLocation);
- LOG.info("log4jConfigLocation = " + log4jConfigLocation);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-service/src/main/java/com/pivotal/pxf/service/utilities/ProtocolData.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/java/com/pivotal/pxf/service/utilities/ProtocolData.java b/pxf/pxf-service/src/main/java/com/pivotal/pxf/service/utilities/ProtocolData.java
deleted file mode 100644
index 5001c40..0000000
--- a/pxf/pxf-service/src/main/java/com/pivotal/pxf/service/utilities/ProtocolData.java
+++ /dev/null
@@ -1,374 +0,0 @@
-package com.pivotal.pxf.service.utilities;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.codec.binary.Base64;
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.security.UserGroupInformation;
-
-import com.pivotal.pxf.api.OutputFormat;
-import com.pivotal.pxf.api.utilities.ColumnDescriptor;
-import com.pivotal.pxf.api.utilities.InputData;
-import com.pivotal.pxf.api.utilities.ProfilesConf;
-
-/**
- * Common configuration of all MetaData classes. Provides read-only access to
- * common parameters supplied using system properties.
- */
-public class ProtocolData extends InputData {
-
- private static final String TRUE_LCASE = "true";
- private static final String FALSE_LCASE = "false";
- private static final String PROP_PREFIX = "X-GP-";
- public static final int INVALID_SPLIT_IDX = -1;
-
- private static final Log LOG = LogFactory.getLog(ProtocolData.class);
-
- protected OutputFormat outputFormat;
- protected int port;
- protected String host;
- protected String profile;
- protected String token;
-
- /**
- * Constructs a ProtocolData. Parses X-GP-* configuration variables.
- *
- * @param paramsMap contains all query-specific parameters from Hawq
- */
- public ProtocolData(Map<String, String> paramsMap) {
-
- requestParametersMap = paramsMap;
- segmentId = getIntProperty("SEGMENT-ID");
- totalSegments = getIntProperty("SEGMENT-COUNT");
- filterStringValid = getBoolProperty("HAS-FILTER");
-
- if (filterStringValid) {
- filterString = getProperty("FILTER");
- }
-
- parseFormat(getProperty("FORMAT"));
-
- host = getProperty("URL-HOST");
- port = getIntProperty("URL-PORT");
-
- tupleDescription = new ArrayList<ColumnDescriptor>();
- recordkeyColumn = null;
- parseTupleDescription();
-
- /*
- * accessor - will throw exception from getPropery() if outputFormat is
- * BINARY and the user did not supply accessor=... or profile=...
- * resolver - will throw exception from getPropery() if outputFormat is
- * BINARY and the user did not supply resolver=... or profile=...
- */
- profile = getOptionalProperty("PROFILE");
- if (profile != null) {
- setProfilePlugins();
- }
- accessor = getProperty("ACCESSOR");
- resolver = getProperty("RESOLVER");
- analyzer = getOptionalProperty("ANALYZER");
- fragmenter = getOptionalProperty("FRAGMENTER");
- dataSource = getProperty("DATA-DIR");
-
- /* Kerberos token information */
- if (UserGroupInformation.isSecurityEnabled()) {
- token = getProperty("TOKEN");
- }
-
- parseFragmentMetadata();
- parseUserData();
- parseThreadSafe();
- parseRemoteCredentials();
-
- dataFragment = INVALID_SPLIT_IDX;
- parseDataFragment(getOptionalProperty("DATA-FRAGMENT"));
-
- // Store alignment for global use as a system property
- System.setProperty("greenplum.alignment", getProperty("ALIGNMENT"));
- }
-
- /**
- * Constructs an InputDataBuilder from a copy. Used to create from an
- * extending class.
- *
- * @param copy the input data to copy
- */
- public ProtocolData(ProtocolData copy) {
- this.requestParametersMap = copy.requestParametersMap;
- this.segmentId = copy.segmentId;
- this.totalSegments = copy.totalSegments;
- this.outputFormat = copy.outputFormat;
- this.host = copy.host;
- this.port = copy.port;
- this.fragmentMetadata = copy.fragmentMetadata;
- this.userData = copy.userData;
- this.tupleDescription = copy.tupleDescription;
- this.recordkeyColumn = copy.recordkeyColumn;
- this.filterStringValid = copy.filterStringValid;
- this.filterString = copy.filterString;
- this.dataSource = copy.dataSource;
- this.accessor = copy.accessor;
- this.resolver = copy.resolver;
- this.fragmenter = copy.fragmenter;
- this.analyzer = copy.analyzer;
- this.threadSafe = copy.threadSafe;
- this.remoteLogin = copy.remoteLogin;
- this.remoteSecret = copy.remoteSecret;
- this.token = copy.token;
- }
-
- public String getToken() {
- return token;
- }
-
- /**
- * Sets the requested profile plugins from profile file into
- * {@link #requestParametersMap}.
- */
- private void setProfilePlugins() {
- Map<String, String> pluginsMap = ProfilesConf.getProfilePluginsMap(profile);
- checkForDuplicates(pluginsMap, requestParametersMap);
- requestParametersMap.putAll(pluginsMap);
- }
-
- /**
- * Verifies there are no duplicates between parameters declared in the table
- * definition and parameters defined in a profile.
- *
- * The parameters' names are case insensitive.
- */
- private void checkForDuplicates(Map<String, String> plugins,
- Map<String, String> params) {
- List<String> duplicates = new ArrayList<>();
- for (String key : plugins.keySet()) {
- if (params.containsKey(key)) {
- duplicates.add(key);
- }
- }
-
- if (!duplicates.isEmpty()) {
- throw new IllegalArgumentException("Profile '" + profile
- + "' already defines: "
- + String.valueOf(duplicates).replace("X-GP-", ""));
- }
- }
-
- /**
- * Returns the request parameters.
- *
- * @return map of request parameters
- */
- public Map<String, String> getParametersMap() {
- return requestParametersMap;
- }
-
- /**
- * Throws an exception when the given property value is missing in request.
- *
- * @param property missing property name
- * @throws IllegalArgumentException throws an exception with the property
- * name in the error message
- */
- public void protocolViolation(String property) {
- String error = "Internal server error. Property \"" + property
- + "\" has no value in current request";
-
- LOG.error(error);
- throw new IllegalArgumentException(error);
- }
-
- /**
- * Returns the value to which the specified property is mapped in
- * {@link #requestParametersMap}.
- *
- * @param property the lookup property key
- * @throws IllegalArgumentException if property key is missing
- */
- private String getProperty(String property) {
- String result = requestParametersMap.get(PROP_PREFIX + property);
-
- if (result == null) {
- protocolViolation(property);
- }
-
- return result;
- }
-
- /**
- * Returns the optional property value. Unlike {@link #getProperty}, it will
- * not fail if the property is not found. It will just return null instead.
- *
- * @param property the lookup optional property
- * @return property value as a String
- */
- private String getOptionalProperty(String property) {
- return requestParametersMap.get(PROP_PREFIX + property);
- }
-
- /**
- * Returns a property value as an int type.
- *
- * @param property the lookup property
- * @return property value as an int type
- * @throws NumberFormatException if the value is missing or can't be
- * represented by an Integer
- */
- private int getIntProperty(String property) {
- return Integer.parseInt(getProperty(property));
- }
-
- /**
- * Returns a property value as boolean type. A boolean property is defined
- * as an int where 0 means false, and anything else true (like C).
- *
- * @param property the lookup property
- * @return property value as boolean
- * @throws NumberFormatException if the value is missing or can't be
- * represented by an Integer
- */
- private boolean getBoolProperty(String property) {
- return getIntProperty(property) != 0;
- }
-
- /**
- * Returns the current output format, either {@link OutputFormat#TEXT} or
- * {@link OutputFormat#BINARY}.
- *
- * @return output format
- */
- public OutputFormat outputFormat() {
- return outputFormat;
- }
-
- /**
- * Returns the server name providing the service.
- *
- * @return server name
- */
- public String serverName() {
- return host;
- }
-
- /**
- * Returns the server port providing the service.
- *
- * @return server port
- */
- public int serverPort() {
- return port;
- }
-
- /**
- * Sets the thread safe parameter. Default value - true.
- */
- private void parseThreadSafe() {
-
- threadSafe = true;
- String threadSafeStr = getOptionalProperty("THREAD-SAFE");
- if (threadSafeStr != null) {
- threadSafe = parseBooleanValue(threadSafeStr);
- }
- }
-
- private boolean parseBooleanValue(String threadSafeStr) {
-
- if (threadSafeStr.equalsIgnoreCase(TRUE_LCASE)) {
- return true;
- }
- if (threadSafeStr.equalsIgnoreCase(FALSE_LCASE)) {
- return false;
- }
- throw new IllegalArgumentException("Illegal boolean value '"
- + threadSafeStr + "'." + " Usage: [TRUE|FALSE]");
- }
-
- /**
- * Sets the format type based on the input string. Allowed values are:
- * "TEXT", "GPDBWritable".
- *
- * @param formatString format string
- */
- protected void parseFormat(String formatString) {
- switch (formatString) {
- case "TEXT":
- outputFormat = OutputFormat.TEXT;
- break;
- case "GPDBWritable":
- outputFormat = OutputFormat.BINARY;
- break;
- default:
- throw new IllegalArgumentException(
- "Wrong value for greenplum.format " + formatString);
- }
- }
-
- /*
- * Sets the tuple description for the record
- */
- void parseTupleDescription() {
- int columns = getIntProperty("ATTRS");
- for (int i = 0; i < columns; ++i) {
- String columnName = getProperty("ATTR-NAME" + i);
- int columnTypeCode = getIntProperty("ATTR-TYPECODE" + i);
- String columnTypeName = getProperty("ATTR-TYPENAME" + i);
-
- ColumnDescriptor column = new ColumnDescriptor(columnName,
- columnTypeCode, i, columnTypeName);
- tupleDescription.add(column);
-
- if (columnName.equalsIgnoreCase(ColumnDescriptor.RECORD_KEY_NAME)) {
- recordkeyColumn = column;
- }
- }
- }
-
- /**
- * Sets the index of the allocated data fragment
- *
- * @param fragment the allocated data fragment
- */
- protected void parseDataFragment(String fragment) {
-
- /*
- * Some resources don't require a fragment, hence the list can be empty.
- */
- if (StringUtils.isEmpty(fragment)) {
- return;
- }
- dataFragment = Integer.parseInt(fragment);
- }
-
- private void parseFragmentMetadata() {
- fragmentMetadata = parseBase64("FRAGMENT-METADATA",
- "Fragment metadata information");
- }
-
- private void parseUserData() {
- userData = parseBase64("FRAGMENT-USER-DATA", "Fragment user data");
- }
-
- private byte[] parseBase64(String key, String errName) {
- String encoded = getOptionalProperty(key);
- if (encoded == null) {
- return null;
- }
- if (!Base64.isArrayByteBase64(encoded.getBytes())) {
- throw new IllegalArgumentException(errName
- + " must be Base64 encoded." + "(Bad value: " + encoded
- + ")");
- }
- byte[] parsed = Base64.decodeBase64(encoded);
- LOG.debug("decoded " + key + ": " + new String(parsed));
- return parsed;
- }
-
- private void parseRemoteCredentials() {
- remoteLogin = getOptionalProperty("REMOTE-USER");
- remoteSecret = getOptionalProperty("REMOTE-PASS");
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-service/src/main/java/com/pivotal/pxf/service/utilities/SecureLogin.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/java/com/pivotal/pxf/service/utilities/SecureLogin.java b/pxf/pxf-service/src/main/java/com/pivotal/pxf/service/utilities/SecureLogin.java
deleted file mode 100644
index e5ad387..0000000
--- a/pxf/pxf-service/src/main/java/com/pivotal/pxf/service/utilities/SecureLogin.java
+++ /dev/null
@@ -1,40 +0,0 @@
-package com.pivotal.pxf.service.utilities;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.SecurityUtil;
-
-/*
- * This class relies heavily on Hadoop API to
- * - Check need for secure login in Hadoop
- * - Parse and load .xml configuration file
- * - Do a Kerberos login with a kaytab file
- * - convert _HOST in Kerberos principal to current hostname
- *
- * It uses Hadoop Configuration to parse XML configuration files
- * It uses Hadoop Security to modify principal and perform the login
- *
- * The major limitation in this class is its dependency
- * on Hadoop. If Hadoop security is off, no login will be performed
- * regardless of connector being used.
- */
-public class SecureLogin {
- private static Log LOG = LogFactory.getLog(SecureLogin.class);
- private static final String CONFIG_KEY_SERVICE_KEYTAB = "pxf.service.kerberos.keytab";
- private static final String CONFIG_KEY_SERVICE_PRINCIPAL = "pxf.service.kerberos.principal";
-
- public static void login() {
- try {
- Configuration config = new Configuration();
- config.addResource("pxf-site.xml");
-
- SecurityUtil.login(config, CONFIG_KEY_SERVICE_KEYTAB, CONFIG_KEY_SERVICE_PRINCIPAL);
- } catch (Exception e)
- {
- LOG.error("PXF service login failed");
- throw new RuntimeException(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-service/src/main/java/com/pivotal/pxf/service/utilities/SecuredHDFS.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/java/com/pivotal/pxf/service/utilities/SecuredHDFS.java b/pxf/pxf-service/src/main/java/com/pivotal/pxf/service/utilities/SecuredHDFS.java
deleted file mode 100644
index d75c22c..0000000
--- a/pxf/pxf-service/src/main/java/com/pivotal/pxf/service/utilities/SecuredHDFS.java
+++ /dev/null
@@ -1,93 +0,0 @@
-package com.pivotal.pxf.service.utilities;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
-import org.apache.hadoop.hdfs.server.namenode.NameNode;
-import org.apache.hadoop.hdfs.server.namenode.NameNodeHttpServer;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-
-import javax.servlet.ServletContext;
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.IOException;
-
-/*
- * The class handles security functions for handling
- * secured HDFS
- */
-public class SecuredHDFS {
- private static final Log LOG = LogFactory.getLog(SecuredHDFS.class);
-
- /*
- * The function will get the token information from parameters and call
- * SecuredHDFS to verify the token.
- *
- * All token properties will be deserialized from string to a Token object
- *
- * @throws SecurityException Thrown when authentication fails
- */
- public static void verifyToken(ProtocolData protData, ServletContext context) {
- try {
- if (UserGroupInformation.isSecurityEnabled()) {
- Token<DelegationTokenIdentifier> token = new Token<DelegationTokenIdentifier>();
- String tokenString = protData.getToken();
- token.decodeFromUrlString(tokenString);
-
- verifyToken(token.getIdentifier(), token.getPassword(),
- token.getKind(), token.getService(), context);
- }
- } catch (IOException e) {
- throw new SecurityException("Failed to verify delegation token "
- + e, e);
- }
- }
-
- /*
- * The function will verify the token with NameNode if available and will
- * create a UserGroupInformation.
- *
- * Code in this function is copied from JspHelper.getTokenUGI
- *
- * @param identifier Delegation token identifier
- *
- * @param password Delegation token password
- *
- * @param servletContext Jetty servlet context which contains the NN address
- *
- * @throws SecurityException Thrown when authentication fails
- */
- private static void verifyToken(byte[] identifier, byte[] password,
- Text kind, Text service,
- ServletContext servletContext) {
- try {
- Token<DelegationTokenIdentifier> token = new Token<DelegationTokenIdentifier>(
- identifier, password, kind, service);
-
- ByteArrayInputStream buf = new ByteArrayInputStream(
- token.getIdentifier());
- DataInputStream in = new DataInputStream(buf);
- DelegationTokenIdentifier id = new DelegationTokenIdentifier();
- id.readFields(in);
-
- final NameNode nn = NameNodeHttpServer.getNameNodeFromContext(servletContext);
- if (nn != null) {
- nn.getNamesystem().verifyToken(id, token.getPassword());
- }
-
- UserGroupInformation userGroupInformation = id.getUser();
- userGroupInformation.addToken(token);
- LOG.debug("user " + userGroupInformation.getUserName() + " ("
- + userGroupInformation.getShortUserName()
- + ") authenticated");
-
- // re-login if necessary
- userGroupInformation.checkTGTAndReloginFromKeytab();
- } catch (IOException e) {
- throw new SecurityException("Failed to verify delegation token "
- + e, e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-service/src/main/java/com/pivotal/pxf/service/utilities/Utilities.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/java/com/pivotal/pxf/service/utilities/Utilities.java b/pxf/pxf-service/src/main/java/com/pivotal/pxf/service/utilities/Utilities.java
deleted file mode 100644
index 220e4b2..0000000
--- a/pxf/pxf-service/src/main/java/com/pivotal/pxf/service/utilities/Utilities.java
+++ /dev/null
@@ -1,100 +0,0 @@
-package com.pivotal.pxf.service.utilities;
-
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import com.pivotal.pxf.api.utilities.InputData;
-
-/**
- * Utilities class exposes helper method for PXF classes
- */
-public class Utilities {
- private static final Log LOG = LogFactory.getLog(Utilities.class);
-
- /**
- * Creates an object using the class name. The class name has to be a class
- * located in the webapp's CLASSPATH.
- *
- * @param confClass the class of the metaData used to initialize the
- * instance
- * @param className a class name to be initialized.
- * @param metaData input data used to initialize the class
- * @return Initialized instance of given className
- * @throws Exception throws exception if classname was not found in
- * classpath, didn't have expected constructor or failed to be
- * instantiated
- */
- public static Object createAnyInstance(Class<?> confClass,
- String className, InputData metaData)
- throws Exception {
- Class<?> cls = Class.forName(className);
- Constructor<?> con = cls.getConstructor(confClass);
- return instantiate(con, metaData);
- }
-
- /**
- * Creates an object using the class name with its default constructor. The
- * class name has to be a class located in the webapp's CLASSPATH.
- *
- * @param className a class name to be initialized
- * @return initialized instance of given className
- * @throws Exception throws exception if classname was not found in
- * classpath, didn't have expected constructor or failed to be
- * instantiated
- */
- public static Object createAnyInstance(String className) throws Exception {
- Class<?> cls = Class.forName(className);
- Constructor<?> con = cls.getConstructor();
- return instantiate(con);
- }
-
- private static Object instantiate(Constructor<?> con, Object... args)
- throws Exception {
- try {
- return con.newInstance(args);
- } catch (InvocationTargetException e) {
- /*
- * We are creating resolvers, accessors, fragmenters, etc. using the
- * reflection framework. If for example, a resolver, during its
- * instantiation - in the c'tor, will throw an exception, the
- * Resolver's exception will reach the Reflection layer and there it
- * will be wrapped inside an InvocationTargetException. Here we are
- * above the Reflection layer and we need to unwrap the Resolver's
- * initial exception and throw it instead of the wrapper
- * InvocationTargetException so that our initial Exception text will
- * be displayed in psql instead of the message:
- * "Internal Server Error"
- */
- throw (e.getCause() != null) ? new Exception(e.getCause()) : e;
- }
- }
-
- /**
- * Transforms a byte array into a string of octal codes in the form
- * \\xyz\\xyz
- *
- * We double escape each char because it is required in postgres bytea for
- * some bytes. In the minimum all non-printables, backslash, null and single
- * quote. Easier to just escape everything see
- * http://www.postgresql.org/docs/9.0/static/datatype-binary.html
- *
- * Octal codes must be padded to 3 characters (001, 012)
- *
- * @param bytes bytes to escape
- * @param sb octal codes of given bytes
- */
- public static void byteArrayToOctalString(byte[] bytes, StringBuilder sb) {
- if ((bytes == null) || (sb == null)) {
- return;
- }
-
- sb.ensureCapacity(sb.length()
- + (bytes.length * 5 /* characters per byte */));
- for (int b : bytes) {
- sb.append(String.format("\\\\%03o", b & 0xff));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/AnalyzerFactory.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/AnalyzerFactory.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/AnalyzerFactory.java
new file mode 100644
index 0000000..6784916
--- /dev/null
+++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/AnalyzerFactory.java
@@ -0,0 +1,17 @@
+package org.apache.hawq.pxf.service;
+
+import org.apache.hawq.pxf.api.Analyzer;
+import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.hawq.pxf.service.utilities.Utilities;
+
+/*
+ * Factory class for creation of Analyzer objects. The actual Analyzer object is "hidden" behind
+ * an Analyzer abstract class which is returned by the AnalyzerFactory.
+ */
+public class AnalyzerFactory {
+ static public Analyzer create(InputData inputData) throws Exception {
+ String analyzerName = inputData.getAnalyzer();
+
+ return (Analyzer) Utilities.createAnyInstance(InputData.class, analyzerName, inputData);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/Bridge.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/Bridge.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/Bridge.java
new file mode 100644
index 0000000..8743d87
--- /dev/null
+++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/Bridge.java
@@ -0,0 +1,21 @@
+package org.apache.hawq.pxf.service;
+
+import org.apache.hawq.pxf.service.io.Writable;
+
+import java.io.DataInputStream;
+
+/*
+ * Bridge interface - defines the interface of the Bridge classes.
+ * Any Bridge class acts as an iterator over Hadoop stored data, and
+ * should implement getNext (for reading) or setNext (for writing)
+ * for handling accessed data.
+ */
+public interface Bridge {
+ boolean beginIteration() throws Exception;
+
+ Writable getNext() throws Exception;
+
+ boolean setNext(DataInputStream inputStream) throws Exception;
+
+ boolean isThreadSafe();
+}