You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hawq.apache.org by sh...@apache.org on 2015/11/03 01:36:12 UTC
[09/15] incubator-hawq git commit: HAWQ-45. PXF package namespace
refactor
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/utilities/HdfsUtilities.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/utilities/HdfsUtilities.java b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/utilities/HdfsUtilities.java
new file mode 100644
index 0000000..aa854fc
--- /dev/null
+++ b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/utilities/HdfsUtilities.java
@@ -0,0 +1,231 @@
+package org.apache.hawq.pxf.plugins.hdfs.utilities;
+
+import org.apache.hawq.pxf.service.utilities.Utilities;
+import org.apache.hawq.pxf.api.io.DataType;
+import org.apache.hawq.pxf.api.OneField;
+import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.mapred.FsInput;
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.BZip2Codec;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.SplittableCompressionCodec;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import java.io.*;
+import java.util.List;
+
+/**
+ * HdfsUtilities class exposes helper methods for PXF classes.
+ */
+public class HdfsUtilities {
+ private static Log Log = LogFactory.getLog(HdfsUtilities.class);
+ private static Configuration config = new Configuration();
+ private static CompressionCodecFactory factory = new CompressionCodecFactory(
+ config);
+
+ /**
+ * Hdfs data sources are absolute data paths. Method ensures that dataSource
+ * begins with '/'.
+ *
+ * @param dataSource The HDFS path to a file or directory of interest.
+ * Retrieved from the client request.
+ * @return an absolute data path
+ */
+ public static String absoluteDataPath(String dataSource) {
+ return (dataSource.charAt(0) == '/') ? dataSource : "/" + dataSource;
+ }
+
+ /*
+ * Helper routine to get a compression codec class
+ */
+ private static Class<? extends CompressionCodec> getCodecClass(Configuration conf,
+ String name) {
+
+ Class<? extends CompressionCodec> codecClass;
+ try {
+ codecClass = conf.getClassByName(name).asSubclass(
+ CompressionCodec.class);
+ } catch (ClassNotFoundException e) {
+ throw new IllegalArgumentException("Compression codec " + name
+ + " was not found.", e);
+ }
+ return codecClass;
+ }
+
+ /**
+ * Helper routine to get compression codec through reflection.
+ *
+ * @param conf configuration used for reflection
+ * @param name codec name
+ * @return generated CompressionCodec
+ */
+ public static CompressionCodec getCodec(Configuration conf, String name) {
+ return ReflectionUtils.newInstance(getCodecClass(conf, name), conf);
+ }
+
+ /**
+ * Helper routine to get compression codec class by path (file suffix).
+ *
+ * @param path path of file to get codec for
+ * @return matching codec class for the path. null if no codec is needed.
+ */
+ private static Class<? extends CompressionCodec> getCodecClassByPath(String path) {
+
+ Class<? extends CompressionCodec> codecClass = null;
+ CompressionCodec codec = factory.getCodec(new Path(path));
+ if (codec != null) {
+ codecClass = codec.getClass();
+ }
+ Log.debug((codecClass == null ? "No codec" : "Codec " + codecClass)
+ + " was found for file " + path);
+ return codecClass;
+ }
+
+ /**
+ * Returns true if the needed codec is splittable. If no codec is needed
+ * returns true as well.
+ *
+ * @param path path of the file to be read
+ * @return if the codec needed for reading the specified path is splittable.
+ */
+ public static boolean isSplittableCodec(Path path) {
+
+ final CompressionCodec codec = factory.getCodec(path);
+ if (null == codec) {
+ return true;
+ }
+
+ return codec instanceof SplittableCompressionCodec;
+ }
+
+ /**
+ * Checks if requests should be handle in a single thread or not.
+ *
+ * @param dataDir hdfs path to the data source
+ * @param compCodec the fully qualified name of the compression codec
+ * @return if the request can be run in multi-threaded mode.
+ */
+ public static boolean isThreadSafe(String dataDir, String compCodec) {
+
+ Class<? extends CompressionCodec> codecClass = (compCodec != null) ? HdfsUtilities.getCodecClass(
+ config, compCodec) : HdfsUtilities.getCodecClassByPath(dataDir);
+ /* bzip2 codec is not thread safe */
+ return (codecClass == null || !BZip2Codec.class.isAssignableFrom(codecClass));
+ }
+
+ /**
+ * Prepares byte serialization of a file split information (start, length,
+ * hosts) using {@link ObjectOutputStream}.
+ *
+ * @param fsp file split to be serialized
+ * @return byte serialization of fsp
+ * @throws IOException if I/O errors occur while writing to the underlying
+ * stream
+ */
+ public static byte[] prepareFragmentMetadata(FileSplit fsp)
+ throws IOException {
+ ByteArrayOutputStream byteArrayStream = new ByteArrayOutputStream();
+ ObjectOutputStream objectStream = new ObjectOutputStream(
+ byteArrayStream);
+ objectStream.writeLong(fsp.getStart());
+ objectStream.writeLong(fsp.getLength());
+ objectStream.writeObject(fsp.getLocations());
+
+ return byteArrayStream.toByteArray();
+ }
+
+ /**
+ * Parses fragment metadata and return matching {@link FileSplit}.
+ *
+ * @param inputData request input data
+ * @return FileSplit with fragment metadata
+ */
+ public static FileSplit parseFragmentMetadata(InputData inputData) {
+ try {
+ byte[] serializedLocation = inputData.getFragmentMetadata();
+ if (serializedLocation == null) {
+ throw new IllegalArgumentException(
+ "Missing fragment location information");
+ }
+
+ ByteArrayInputStream bytesStream = new ByteArrayInputStream(
+ serializedLocation);
+ ObjectInputStream objectStream = new ObjectInputStream(bytesStream);
+
+ long start = objectStream.readLong();
+ long end = objectStream.readLong();
+
+ String[] hosts = (String[]) objectStream.readObject();
+
+ FileSplit fileSplit = new FileSplit(new Path(
+ inputData.getDataSource()), start, end, hosts);
+
+ Log.debug("parsed file split: path " + inputData.getDataSource()
+ + ", start " + start + ", end " + end + ", hosts "
+ + ArrayUtils.toString(hosts));
+
+ return fileSplit;
+
+ } catch (Exception e) {
+ throw new RuntimeException(
+ "Exception while reading expected fragment metadata", e);
+ }
+ }
+
+ /**
+ * Accessing the Avro file through the "unsplittable" API just to get the
+ * schema. The splittable API (AvroInputFormat) which is the one we will be
+ * using to fetch the records, does not support getting the Avro schema yet.
+ *
+ * @param conf Hadoop configuration
+ * @param dataSource Avro file (i.e fileName.avro) path
+ * @return the Avro schema
+ * @throws IOException if I/O error occured while accessing Avro schema file
+ */
+ public static Schema getAvroSchema(Configuration conf, String dataSource)
+ throws IOException {
+ FsInput inStream = new FsInput(new Path(dataSource), conf);
+ DatumReader<GenericRecord> dummyReader = new GenericDatumReader<>();
+ DataFileReader<GenericRecord> dummyFileReader = new DataFileReader<>(
+ inStream, dummyReader);
+ Schema schema = dummyFileReader.getSchema();
+ dummyFileReader.close();
+ return schema;
+ }
+
+ /**
+ * Returns string serialization of list of fields. Fields of binary type
+ * (BYTEA) are converted to octal representation to make sure they will be
+ * relayed properly to the DB.
+ *
+ * @param complexRecord list of fields to be stringified
+ * @param delimiter delimiter between fields
+ * @return string of serialized fields using delimiter
+ */
+ public static String toString(List<OneField> complexRecord, String delimiter) {
+ StringBuilder buff = new StringBuilder();
+ String delim = ""; // first iteration has no delimiter
+ for (OneField complex : complexRecord) {
+ if (complex.type == DataType.BYTEA.getOID()) {
+ /** Serialize byte array as string */
+ buff.append(delim);
+ Utilities.byteArrayToOctalString((byte[]) complex.val, buff);
+ } else {
+ buff.append(delim).append(complex.val);
+ }
+ delim = delimiter;
+ }
+ return buff.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/utilities/PxfInputFormat.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/utilities/PxfInputFormat.java b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/utilities/PxfInputFormat.java
new file mode 100644
index 0000000..047a5c2
--- /dev/null
+++ b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/utilities/PxfInputFormat.java
@@ -0,0 +1,33 @@
+package org.apache.hawq.pxf.plugins.hdfs.utilities;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.*;
+
+import java.io.IOException;
+
+/**
+ * PxfInputFormat is not intended to read a specific format, hence it implements
+ * a dummy getRecordReader Instead, its purpose is to apply
+ * FileInputFormat.getSplits from one point in PXF and get the splits which are
+ * valid for the actual InputFormats, since all of them we use inherit
+ * FileInputFormat but do not override getSplits.
+ */
+public class PxfInputFormat extends FileInputFormat {
+
+ @Override
+ public RecordReader getRecordReader(InputSplit split,
+ JobConf conf,
+ Reporter reporter) throws IOException {
+ throw new UnsupportedOperationException("PxfInputFormat should not be used for reading data, but only for obtaining the splits of a file");
+ }
+
+ /*
+ * Return true if this file can be split.
+ */
+ @Override
+ protected boolean isSplitable(FileSystem fs, Path filename) {
+ return HdfsUtilities.isSplittableCodec(filename);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/utilities/RecordkeyAdapter.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/utilities/RecordkeyAdapter.java b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/utilities/RecordkeyAdapter.java
new file mode 100644
index 0000000..3529932
--- /dev/null
+++ b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/utilities/RecordkeyAdapter.java
@@ -0,0 +1,265 @@
+package org.apache.hawq.pxf.plugins.hdfs.utilities;
+
+import org.apache.hawq.pxf.api.OneField;
+import org.apache.hawq.pxf.api.OneRow;
+import org.apache.hawq.pxf.api.utilities.ColumnDescriptor;
+import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.*;
+
+import java.util.List;
+
+/**
+ * Adapter used for adding a recordkey field to the records output {@code List<OneField>}.
+ */
+public class RecordkeyAdapter {
+ private Log Log;
+
+ /*
+ * We need to transform Record keys to java primitive types.
+ * Since the type of the key is the same throughout the file we do the type resolution
+ * in the first call (for the first record) and then use a "Java variation on Function pointer"
+ * to do the extraction for the rest of the records.
+ */
+ private interface ValExtractor {
+ public Object get(Object key);
+ }
+
+ private ValExtractor extractor = null;
+
+ private interface ValConverter {
+ public Writable get(Object key);
+ }
+
+ private ValConverter converter = null;
+
+ /**
+ * Constructs a RecordkeyAdapter.
+ */
+ public RecordkeyAdapter() {
+ Log = LogFactory.getLog(RecordkeyAdapter.class);
+ }
+
+ /**
+ * Adds the recordkey to the end of the passed in recFields list.
+ * <p>
+ * This method also verifies cases in which record keys are not supported
+ * by the underlying source type, and therefore "illegally" requested.
+ *
+ * @param recFields existing list of record (non-key) fields and their values.
+ * @param input all input parameters coming from the client request
+ * @param onerow a row object which is used here in order to find out if
+ * the given type supports recordkeys or not.
+ * @return 0 if record key not needed, or 1 if record key was appended
+ * @throws NoSuchFieldException when the given record type does not support
+ * recordkeys
+ */
+ public int appendRecordkeyField(List<OneField> recFields,
+ InputData input,
+ OneRow onerow) throws NoSuchFieldException {
+
+ /*
+ * user did not request the recordkey field in the
+ * "create external table" statement
+ */
+ ColumnDescriptor recordkeyColumn = input.getRecordkeyColumn();
+ if (recordkeyColumn == null) {
+ return 0;
+ }
+
+ /*
+ * The recordkey was filled in the fileAccessor during execution of
+ * method readNextObject. The current accessor implementations are
+ * SequenceFileAccessor, LineBreakAccessor and AvroFileAccessor from
+ * HdfsSplittableDataAccessor and QuotedLineBreakAccessor from
+ * HdfsAtomicDataAccessor. For SequenceFileAccessor, LineBreakAccessor
+ * the recordkey is set, since it is returned by the
+ * SequenceFileRecordReader or LineRecordReader(for text file). But Avro
+ * files do not have keys, so the AvroRecordReader will not return a key
+ * and in this case recordkey will be null. If the user specified a
+ * recordkey attribute in the CREATE EXTERNAL TABLE statement and he
+ * reads from an AvroFile, we will throw an exception since the Avro
+ * file does not have keys In the future, additional implementations of
+ * FileAccessors will have to set recordkey during readNextObject().
+ * Otherwise it is null by default and we will throw an exception here,
+ * that is if we get here... a careful user will not specify recordkey
+ * in the CREATE EXTERNAL statement and then we will leave this function
+ * one line above.
+ */
+ Object recordkey = onerow.getKey();
+ if (recordkey == null) {
+ throw new NoSuchFieldException("Value for field \"recordkey\" was requested but the queried HDFS resource type does not support key");
+ }
+
+ OneField oneField = new OneField();
+ oneField.type = recordkeyColumn.columnTypeCode();
+ oneField.val = extractVal(recordkey);
+ recFields.add(oneField);
+ return 1;
+ }
+
+ /*
+ * Extracts a java primitive type value from the recordkey. If the key is a
+ * Writable implementation we extract the value as a Java primitive. If the
+ * key is already a Java primitive we returned it as is If it is an unknown
+ * type we throw an exception
+ */
+ private Object extractVal(Object key) {
+ if (extractor == null) {
+ extractor = InitializeExtractor(key);
+ }
+
+ return extractor.get(key);
+ }
+
+ /*
+ * Initialize the extractor object based on the type of the recordkey
+ */
+ private ValExtractor InitializeExtractor(Object key) {
+ if (key instanceof IntWritable) {
+ return new ValExtractor() {
+ @Override
+ public Object get(Object key) {
+ return ((IntWritable) key).get();
+ }
+ };
+ } else if (key instanceof ByteWritable) {
+ return new ValExtractor() {
+ @Override
+ public Object get(Object key) {
+ return ((ByteWritable) key).get();
+ }
+ };
+ } else if (key instanceof BooleanWritable) {
+ return new ValExtractor() {
+ @Override
+ public Object get(Object key) {
+ return ((BooleanWritable) key).get();
+ }
+ };
+ } else if (key instanceof DoubleWritable) {
+ return new ValExtractor() {
+ @Override
+ public Object get(Object key) {
+ return ((DoubleWritable) key).get();
+ }
+ };
+ } else if (key instanceof FloatWritable) {
+ return new ValExtractor() {
+ @Override
+ public Object get(Object key) {
+ return ((FloatWritable) key).get();
+ }
+ };
+ } else if (key instanceof LongWritable) {
+ return new ValExtractor() {
+ @Override
+ public Object get(Object key) {
+ return ((LongWritable) key).get();
+ }
+ };
+ } else if (key instanceof Text) {
+ return new ValExtractor() {
+ @Override
+ public Object get(Object key) {
+ return (key).toString();
+ }
+ };
+ } else if (key instanceof VIntWritable) {
+ return new ValExtractor() {
+ @Override
+ public Object get(Object key) {
+ return ((VIntWritable) key).get();
+ }
+ };
+ } else {
+ return new ValExtractor() {
+ @Override
+ public Object get(Object key) {
+ throw new UnsupportedOperationException("Unsupported recordkey data type " + key.getClass().getName());
+ }
+ };
+ }
+ }
+
+ /**
+ * Converts given key object to its matching Writable.
+ * Supported types: Integer, Byte, Boolean, Double, Float, Long, String.
+ * The type is only checked once based on the key, all consequent calls
+ * must be of the same type.
+ *
+ * @param key object to convert
+ * @return Writable object matching given key
+ */
+ public Writable convertKeyValue(Object key) {
+ if (converter == null) {
+ converter = initializeConverter(key);
+ Log.debug("converter initialized for type " + key.getClass() +
+ " (key value: " + key + ")");
+ }
+
+ return converter.get(key);
+ }
+
+ private ValConverter initializeConverter(Object key) {
+
+ if (key instanceof Integer) {
+ return new ValConverter() {
+ @Override
+ public Writable get(Object key) {
+ return (new IntWritable((Integer) key));
+ }
+ };
+ } else if (key instanceof Byte) {
+ return new ValConverter() {
+ @Override
+ public Writable get(Object key) {
+ return (new ByteWritable((Byte) key));
+ }
+ };
+ } else if (key instanceof Boolean) {
+ return new ValConverter() {
+ @Override
+ public Writable get(Object key) {
+ return (new BooleanWritable((Boolean) key));
+ }
+ };
+ } else if (key instanceof Double) {
+ return new ValConverter() {
+ @Override
+ public Writable get(Object key) {
+ return (new DoubleWritable((Double) key));
+ }
+ };
+ } else if (key instanceof Float) {
+ return new ValConverter() {
+ @Override
+ public Writable get(Object key) {
+ return (new FloatWritable((Float) key));
+ }
+ };
+ } else if (key instanceof Long) {
+ return new ValConverter() {
+ @Override
+ public Writable get(Object key) {
+ return (new LongWritable((Long) key));
+ }
+ };
+ } else if (key instanceof String) {
+ return new ValConverter() {
+ @Override
+ public Writable get(Object key) {
+ return (new Text((String) key));
+ }
+ };
+ } else {
+ return new ValConverter() {
+ @Override
+ public Writable get(Object key) {
+ throw new UnsupportedOperationException("Unsupported recordkey data type " + key.getClass().getName());
+ }
+ };
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-hdfs/src/test/java/com/pivotal/pxf/plugins/hdfs/ChunkReaderTest.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/test/java/com/pivotal/pxf/plugins/hdfs/ChunkReaderTest.java b/pxf/pxf-hdfs/src/test/java/com/pivotal/pxf/plugins/hdfs/ChunkReaderTest.java
deleted file mode 100644
index b34e6c5..0000000
--- a/pxf/pxf-hdfs/src/test/java/com/pivotal/pxf/plugins/hdfs/ChunkReaderTest.java
+++ /dev/null
@@ -1,122 +0,0 @@
-package com.pivotal.pxf.plugins.hdfs;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.hdfs.DFSInputStream;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mockito;
-import org.mockito.stubbing.*;
-import org.mockito.invocation.*;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.core.classloader.annotations.SuppressStaticInitializationFor;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import static org.mockito.Matchers.any;
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
-
-/**
- * Tester for the ChunkReader class
- */
-@RunWith(PowerMockRunner.class)
-@PrepareForTest({ChunkReader.class})
-public class ChunkReaderTest {
-
- ChunkReader reader;
- /* Mocking the stream class that accesses the actual data */
- DFSInputStream mockStream;
-
- /*
- * setUp function called before each test.
- */
- @Before
- public void setUp() throws Exception {
- mockStream = mock(DFSInputStream.class);
- }
-
- /*
- * Simulate the empty file case
- */
- @Test
- public void readEmptyFile() throws Exception {
- reader = new ChunkReader(mockStream);
- when( mockStream.read( (byte [])Mockito.anyObject()) ).thenReturn(0);
-
- Writable out = new ChunkWritable();
- int maxBytesToConsume = 1024*1024;
- assertEquals(0, reader.readLine(out, maxBytesToConsume));
- }
-
- /*
- * Read one line
- */
- @Test
- public void readOneLine() throws Exception {
- reader = new ChunkReader(mockStream);
- when( mockStream.read( (byte [])Mockito.anyObject()) ).thenAnswer(new Answer<java.lang.Number>() {
- @Override
- public java.lang.Number answer(InvocationOnMock invocation) throws Throwable {
- byte[] buf = (byte[]) invocation.getArguments()[0];
-
- byte [] source = "OneLine\nTwoLine\n".getBytes();
- System.arraycopy(source, 0, buf, 0, source.length);
- return new java.lang.Byte(buf[0]);
- }
- });
-
- ChunkWritable out = new ChunkWritable();
- int maxBytesToConsume = 1024*1024;
- // read first line
- assertEquals("OneLine\n".length()
- , reader.readLine(out, maxBytesToConsume) );
- assertEquals("OneLine\n", new String(out.box) );
-
- // read second line
- assertEquals("TwoLine\n".length(), reader.readLine(out, maxBytesToConsume) );
- assertEquals("TwoLine\n", new String(out.box) );
- }
-
- /*
- * Read one line
- */
- @Test
- public void readChunk() throws Exception {
- reader = new ChunkReader(mockStream);
- when( mockStream.read( (byte [])Mockito.anyObject()) ).thenAnswer(new Answer<java.lang.Number>() {
- @Override
- public java.lang.Number answer(InvocationOnMock invocation) throws Throwable {
- byte[] buf = (byte[]) invocation.getArguments()[0];
-
- byte [] source = "OneLine\nTwoLine\n".getBytes();
- System.arraycopy(source, 0, buf, 0, source.length);
- return new java.lang.Integer(source.length);
- }
- });
-
- ChunkWritable out = new ChunkWritable();
- int maxBytesToConsume = 10; /* make readChunk return after reading the first "chunk": OneLine\nTwoLine\n */
- // read chunk
- assertEquals("OneLine\nTwoLine\n".length()
- , reader.readChunk(out, maxBytesToConsume) );
- assertEquals("OneLine\nTwoLine\n", new String(out.box) );
- }
-
-}
-
-
-
-
-
-
-
-
-
-
-
-
-
-
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-hdfs/src/test/java/com/pivotal/pxf/plugins/hdfs/SequenceFileAccessorTest.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/test/java/com/pivotal/pxf/plugins/hdfs/SequenceFileAccessorTest.java b/pxf/pxf-hdfs/src/test/java/com/pivotal/pxf/plugins/hdfs/SequenceFileAccessorTest.java
deleted file mode 100644
index 016f0ed..0000000
--- a/pxf/pxf-hdfs/src/test/java/com/pivotal/pxf/plugins/hdfs/SequenceFileAccessorTest.java
+++ /dev/null
@@ -1,191 +0,0 @@
-package com.pivotal.pxf.plugins.hdfs;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import javax.servlet.ServletContext;
-
-import org.apache.commons.logging.Log;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileContext;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.compress.BZip2Codec;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.SequenceFileInputFormat;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mockito;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.core.classloader.annotations.SuppressStaticInitializationFor;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import static org.mockito.Matchers.any;
-import static org.junit.Assert.*;
-
-import com.pivotal.pxf.api.utilities.InputData;
-import com.pivotal.pxf.plugins.hdfs.utilities.HdfsUtilities;
-
-import static org.mockito.Mockito.*;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest({SequenceFileAccessor.class, HdfsSplittableDataAccessor.class, HdfsUtilities.class})
-@SuppressStaticInitializationFor({"org.apache.hadoop.mapred.JobConf","org.apache.hadoop.fs.FileContext"}) // Prevents static inits
-public class SequenceFileAccessorTest {
-
- InputData inputData;
- SequenceFileAccessor accessor;
- Map<String, String> parameters;
- ServletContext mockContext;
- Configuration hdfsConfiguration;
- SequenceFileInputFormat<?,?> inFormat;
- JobConf jobConf = null;
- Path file;
- FileSystem fs;
- FileContext fc;
- Log mockLog;
-
- /*
- * setUp function called before each test.
- *
- * As the focus of the test is compression codec and type behavior, and
- * since the compression methods are private to SequenceFileAccessor, we
- * test their invocation and results by calling the public openForWrite().
- * Since openForWrite does some filesystem operations on HDFS, those had
- * to be mocked (and provided good material for a new Kafka story).
- */
- @Before
- public void setUp() throws Exception {
-
- mockContext = mock(ServletContext.class);
- inFormat = mock(SequenceFileInputFormat.class);
- hdfsConfiguration = mock(Configuration.class);
- jobConf = mock(JobConf.class);
- file = mock(Path.class);
- fs = mock(FileSystem.class);
- fc = mock(FileContext.class);
- inputData = mock(InputData.class);
-
- PowerMockito.mockStatic(FileContext.class);
- PowerMockito.mockStatic(HdfsUtilities.class);
- PowerMockito.whenNew(Path.class).withArguments(Mockito.anyString()).thenReturn(file);
-
- when(file.getFileSystem(any(Configuration.class))).thenReturn(fs);
- when(inputData.getDataSource()).thenReturn("deep.throat");
- when(inputData.getSegmentId()).thenReturn(0);
- when(FileContext.getFileContext()).thenReturn(fc);
- }
-
- /*
- * After each test is done, close the accessor if it was created
- */
- @After
- public void tearDown() throws Exception {
-
- if (accessor == null) {
- return;
- }
-
- accessor.closeForWrite();
- accessor = null;
- }
-
- private void constructAccessor() throws Exception {
-
- accessor = new SequenceFileAccessor(inputData);
- accessor.openForWrite();
- }
-
- private void mockCompressionOptions(String codec, String type)
- {
- when(inputData.getUserProperty("COMPRESSION_CODEC")).thenReturn(codec);
- when(inputData.getUserProperty("COMPRESSION_TYPE")).thenReturn(type);
- }
-
- @Test
- public void compressionNotSpecified() throws Exception {
-
- mockCompressionOptions(null, null);
- constructAccessor();
- assertEquals(SequenceFile.CompressionType.NONE, accessor.getCompressionType());
- assertEquals(null, accessor.getCodec());
- }
-
- @Test
- public void compressCodec() throws Exception {
-
- //using BZip2 as a valid example
- when(HdfsUtilities.getCodec((Configuration)Mockito.anyObject(), Mockito.anyString())).thenReturn(new BZip2Codec());
- mockCompressionOptions("org.apache.hadoop.io.compress.BZip2Codec", null);
- constructAccessor();
- assertEquals(".bz2", accessor.getCodec().getDefaultExtension());
- }
-
- @Test
- public void bogusCompressCodec() {
-
- final String codecName = "So I asked, who is he? He goes by the name of Wayne Rooney";
- when(HdfsUtilities.getCodec((Configuration)Mockito.anyObject(), Mockito.anyString())).thenThrow(new IllegalArgumentException("Compression codec " + codecName + " was not found."));
- mockCompressionOptions(codecName, null);
-
- try {
- constructAccessor();
- fail("should throw no codec found exception");
- } catch (Exception e) {
- assertEquals("Compression codec " + codecName + " was not found.", e.getMessage());
- }
- }
-
- @Test
- public void compressTypes() throws Exception {
-
- when(HdfsUtilities.getCodec((Configuration)Mockito.anyObject(), Mockito.anyString())).thenReturn(new BZip2Codec());
-
- //proper value
- mockCompressionOptions("org.apache.hadoop.io.compress.BZip2Codec", "BLOCK");
- constructAccessor();
- assertEquals(".bz2", accessor.getCodec().getDefaultExtension());
- assertEquals(org.apache.hadoop.io.SequenceFile.CompressionType.BLOCK, accessor.getCompressionType());
-
- //case (non) sensitivity
- mockCompressionOptions("org.apache.hadoop.io.compress.BZip2Codec", "ReCoRd");
- constructAccessor();
- assertEquals(".bz2", accessor.getCodec().getDefaultExtension());
- assertEquals(org.apache.hadoop.io.SequenceFile.CompressionType.RECORD, accessor.getCompressionType());
-
- //default (RECORD)
- mockCompressionOptions("org.apache.hadoop.io.compress.BZip2Codec", null);
- constructAccessor();
- assertEquals(".bz2", accessor.getCodec().getDefaultExtension());
- assertEquals(org.apache.hadoop.io.SequenceFile.CompressionType.RECORD, accessor.getCompressionType());
- }
-
- @Test
- public void illegalCompressTypes() throws Exception {
-
- when(HdfsUtilities.getCodec((Configuration)Mockito.anyObject(), Mockito.anyString())).thenReturn(new BZip2Codec());
- mockCompressionOptions("org.apache.hadoop.io.compress.BZip2Codec", "Oy");
-
- try {
- constructAccessor();
- fail("illegal COMPRESSION_TYPE should throw IllegalArgumentException");
- } catch (IllegalArgumentException e) {
- assertEquals("Illegal compression type 'Oy'", e.getMessage());
- }
-
- mockCompressionOptions("org.apache.hadoop.io.compress.BZip2Codec", "NONE");
-
- try {
- constructAccessor();
- fail("illegal COMPRESSION_TYPE should throw IllegalArgumentException");
- } catch (IllegalArgumentException e) {
- assertEquals("Illegal compression type 'NONE'. For disabling compression remove COMPRESSION_CODEC parameter.", e.getMessage());
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-hdfs/src/test/java/com/pivotal/pxf/plugins/hdfs/StringPassResolverTest.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/test/java/com/pivotal/pxf/plugins/hdfs/StringPassResolverTest.java b/pxf/pxf-hdfs/src/test/java/com/pivotal/pxf/plugins/hdfs/StringPassResolverTest.java
deleted file mode 100644
index 1191274..0000000
--- a/pxf/pxf-hdfs/src/test/java/com/pivotal/pxf/plugins/hdfs/StringPassResolverTest.java
+++ /dev/null
@@ -1,149 +0,0 @@
-package com.pivotal.pxf.plugins.hdfs;
-
-import com.pivotal.pxf.api.OneField;
-import com.pivotal.pxf.api.OneRow;
-import com.pivotal.pxf.api.OutputFormat;
-import com.pivotal.pxf.service.BridgeInputBuilder;
-import com.pivotal.pxf.service.io.Text;
-import com.pivotal.pxf.service.utilities.ProtocolData;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.util.Arrays;
-import java.util.List;
-
-import static org.junit.Assert.*;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.*;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest({Text.class, BridgeInputBuilder.class, ProtocolData.class, LogFactory.class})
-public class StringPassResolverTest {
- ProtocolData mockProtocolData;
- Log mockLog;
-
- @Test
- /*
- * Test the setFields method: small \n terminated input
- */
- public void testSetFields() throws Exception {
- StringPassResolver resolver = buildResolver();
-
- byte[] data = new byte[]{(int) 'a', (int) 'b', (int) 'c', (int) 'd', (int) '\n',
- (int) 'n', (int) 'o', (int) '\n'};
-
- DataInputStream inputStream = new DataInputStream(new ByteArrayInputStream(data));
- BridgeInputBuilder inputBuilder = new BridgeInputBuilder(mockProtocolData);
- List<OneField> record = inputBuilder.makeInput(inputStream);
-
- OneRow oneRow = resolver.setFields(record);
- verifyOneRow(oneRow, Arrays.copyOfRange(data, 0, 5));
-
- record = inputBuilder.makeInput(inputStream);
- oneRow = resolver.setFields(record);
- verifyOneRow(oneRow, Arrays.copyOfRange(data, 5, 8));
- }
-
- @Test
- /*
- * Test the setFields method: input > buffer size, \n terminated
- */
- public void testSetFieldsBigArray() throws Exception {
-
- StringPassResolver resolver = buildResolver();
-
- byte[] bigArray = new byte[2000];
- for (int i = 0; i < 1999; ++i) {
- bigArray[i] = (byte) (i % 10 + 30);
- }
- bigArray[1999] = (byte) '\n';
-
- DataInputStream inputStream = new DataInputStream(new ByteArrayInputStream(bigArray));
- BridgeInputBuilder inputBuilder = new BridgeInputBuilder(mockProtocolData);
- List<OneField> record = inputBuilder.makeInput(inputStream);
-
- OneRow oneRow = resolver.setFields(record);
-
- verifyOneRow(oneRow, bigArray);
- }
-
- @Test
- /*
- * Test the setFields method: input > buffer size, no \n
- */
- public void testSetFieldsBigArrayNoNewLine() throws Exception {
-
- PowerMockito.mockStatic(LogFactory.class);
- mockLog = mock(Log.class);
- PowerMockito.when(LogFactory.getLog(any(Class.class))).thenReturn(mockLog);
-
- StringPassResolver resolver = buildResolver();
-
- byte[] bigArray = new byte[2000];
- for (int i = 0; i < 2000; ++i) {
- bigArray[i] = (byte) (i % 10 + 60);
- }
-
- DataInputStream inputStream = new DataInputStream(new ByteArrayInputStream(bigArray));
- BridgeInputBuilder inputBuilder = new BridgeInputBuilder(mockProtocolData);
- List<OneField> record = inputBuilder.makeInput(inputStream);
-
- OneRow oneRow = resolver.setFields(record);
-
- verifyOneRow(oneRow, bigArray);
-
- //verify(mockLog, atLeastOnce()).info(anyString());
- //Mockito.verify(mockLog).warn("Stream ended without line breaksdfljsldkj");
- //verifyWarning();
- }
-
- @Test
- /*
- * Test the setFields method: empty stream (returns -1)
- */
- public void testSetFieldsEmptyStream() throws Exception {
-
- StringPassResolver resolver = buildResolver();
-
- byte[] empty = new byte[0];
-
- DataInputStream inputStream = new DataInputStream(new ByteArrayInputStream(empty));
- BridgeInputBuilder inputBuilder = new BridgeInputBuilder(mockProtocolData);
- List<OneField> record = inputBuilder.makeInput(inputStream);
-
- OneRow oneRow = resolver.setFields(record);
-
- assertNull(oneRow);
- }
-
- /*
- * helpers functions
- */
- private StringPassResolver buildResolver()
- throws Exception {
-
- mockProtocolData = mock(ProtocolData.class);
- PowerMockito.when(mockProtocolData.outputFormat()).thenReturn(OutputFormat.TEXT);
-
- return new StringPassResolver(mockProtocolData);
- }
-
- private void verifyOneRow(OneRow oneRow, byte[] expected) {
- assertNull(oneRow.getKey());
- byte[] bytes = (byte[]) oneRow.getData();
- byte[] result = Arrays.copyOfRange(bytes, 0, bytes.length);
- assertEquals(result.length, expected.length);
- assertTrue(Arrays.equals(result, expected));
- }
-
-// private void verifyWarning() {
-// Mockito.verify(Log).warn("Stream ended without line break");
-// }
-}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-hdfs/src/test/java/com/pivotal/pxf/plugins/hdfs/utilities/HdfsUtilitiesTest.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/test/java/com/pivotal/pxf/plugins/hdfs/utilities/HdfsUtilitiesTest.java b/pxf/pxf-hdfs/src/test/java/com/pivotal/pxf/plugins/hdfs/utilities/HdfsUtilitiesTest.java
deleted file mode 100644
index 83af8ba..0000000
--- a/pxf/pxf-hdfs/src/test/java/com/pivotal/pxf/plugins/hdfs/utilities/HdfsUtilitiesTest.java
+++ /dev/null
@@ -1,181 +0,0 @@
-package com.pivotal.pxf.plugins.hdfs.utilities;
-
-import com.pivotal.pxf.api.OneField;
-import org.apache.commons.logging.Log;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.compress.*;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.core.classloader.annotations.SuppressStaticInitializationFor;
-import org.powermock.modules.junit4.PowerMockRunner;
-import org.powermock.reflect.Whitebox;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-@RunWith(PowerMockRunner.class)
-@SuppressStaticInitializationFor("com.pivotal.pxf.plugins.hdfs.utilities.HdfsUtilities")
-@PrepareForTest({HdfsUtilities.class, ReflectionUtils.class})
-public class HdfsUtilitiesTest {
-
- Configuration conf;
- CompressionCodecFactory factory;
- Log Log;
-
- @Before
- public void SetupCompressionFactory() {
- factory = mock(CompressionCodecFactory.class);
- Whitebox.setInternalState(HdfsUtilities.class, factory);
- Log = mock(Log.class);
- Whitebox.setInternalState(HdfsUtilities.class, Log);
- }
-
- @Test
- public void getCodecNoName() {
-
- Configuration conf = new Configuration();
- String name = "some.bad.codec";
-
- try {
- HdfsUtilities.getCodec(conf, name);
- fail("function should fail with bad codec name " + name);
- } catch (IllegalArgumentException e) {
- assertEquals(e.getMessage(), "Compression codec " + name + " was not found.");
- }
- }
-
- @Test
- public void getCodecNoConf() {
-
- Configuration conf = null;
- String name = "org.apache.hadoop.io.compress.GzipCodec";
-
- try {
- HdfsUtilities.getCodec(conf, name);
- fail("function should fail with when conf is null");
- } catch (NullPointerException e) {
- assertTrue(true);
- }
- }
-
- @Test
- public void getCodecGzip() {
-
- Configuration conf = new Configuration();
- String name = "org.apache.hadoop.io.compress.GzipCodec";
-
- PowerMockito.mockStatic(ReflectionUtils.class);
- GzipCodec gzipCodec = mock(GzipCodec.class);
-
- when(ReflectionUtils.newInstance(GzipCodec.class, conf)).thenReturn(gzipCodec);
-
- CompressionCodec codec = HdfsUtilities.getCodec(conf, name);
- assertNotNull(codec);
- assertEquals(codec, gzipCodec);
- }
-
- @Test
- public void isThreadSafe() {
-
- testIsThreadSafe(
- "readable compression, no compression - thread safe",
- "/some/path/without.compression",
- null, null,
- true);
-
- testIsThreadSafe(
- "readable compression, gzip compression - thread safe",
- "/some/compressed/path.gz",
- null, new GzipCodec(),
- true);
-
- testIsThreadSafe(
- "readable compression, bzip2 compression - not thread safe",
- "/some/path/with/bzip2.bz2",
- null, new BZip2Codec(),
- false);
-
- testIsThreadSafe(
- "writable compression, no compression codec - thread safe",
- "/some/path",
- null, null,
- true);
-
- testIsThreadSafe(
- "writable compression, some compression codec - thread safe",
- "/some/path",
- "I.am.a.nice.codec", new NotSoNiceCodec(),
- true);
-
- testIsThreadSafe(
- "writable compression, compression codec bzip2 - not thread safe",
- "/some/path",
- "org.apache.hadoop.io.compress.BZip2Codec", new BZip2Codec(),
- false);
- }
-
- private void testIsThreadSafe(String testDescription, String path, String codecStr, CompressionCodec codec, boolean expectedResult) {
- prepareDataForIsThreadSafe(path, codecStr, codec);
-
- boolean result = HdfsUtilities.isThreadSafe(path, codecStr);
- assertTrue(testDescription, result == expectedResult);
- }
-
- private void prepareDataForIsThreadSafe(String dataDir, String codecStr, CompressionCodec codec) {
- try {
- conf = PowerMockito.mock(Configuration.class);
- PowerMockito.whenNew(Configuration.class).withNoArguments().thenReturn(conf);
- } catch (Exception e) {
- fail("new Configuration mocking failed");
- }
-
- if (codecStr == null) {
- when(factory.getCodec(new Path(dataDir))).thenReturn(codec);
- } else {
- PowerMockito.stub(PowerMockito.method(HdfsUtilities.class, "getCodecClass")).toReturn(codec.getClass());
- }
- }
-
- @Test
- public void isSplittableCodec() {
-
- testIsSplittableCodec("no codec - splittable",
- "some/innocent.file", null, true);
- testIsSplittableCodec("gzip codec - not splittable",
- "/gzip.gz", new GzipCodec(), false);
- testIsSplittableCodec("default codec - not splittable",
- "/default.deflate", new DefaultCodec(), false);
- testIsSplittableCodec("bzip2 codec - splittable",
- "bzip2.bz2", new BZip2Codec(), true);
- }
-
- private void testIsSplittableCodec(String description,
- String pathName, CompressionCodec codec, boolean expected) {
- Path path = new Path(pathName);
- when(factory.getCodec(path)).thenReturn(codec);
-
- boolean result = HdfsUtilities.isSplittableCodec(path);
- assertEquals(description, result, expected);
- }
-
- @Test
- public void testToString() {
- List<OneField> oneFields = Arrays.asList(new OneField(1, "uno"), new OneField(2, "dos"), new OneField(3, "tres"));
-
- assertEquals("uno!dos!tres", HdfsUtilities.toString(oneFields, "!"));
-
- assertEquals("uno", HdfsUtilities.toString(Collections.singletonList(oneFields.get(0)), "!"));
-
- assertEquals("", HdfsUtilities.toString(Collections.<OneField>emptyList(), "!"));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-hdfs/src/test/java/com/pivotal/pxf/plugins/hdfs/utilities/NotSoNiceCodec.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/test/java/com/pivotal/pxf/plugins/hdfs/utilities/NotSoNiceCodec.java b/pxf/pxf-hdfs/src/test/java/com/pivotal/pxf/plugins/hdfs/utilities/NotSoNiceCodec.java
deleted file mode 100644
index 4f5b18b..0000000
--- a/pxf/pxf-hdfs/src/test/java/com/pivotal/pxf/plugins/hdfs/utilities/NotSoNiceCodec.java
+++ /dev/null
@@ -1,64 +0,0 @@
-package com.pivotal.pxf.plugins.hdfs.utilities;
-
-import org.apache.hadoop.io.compress.*;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-/**
- * Codec class for UtilitiesTest
- * Can't be embedded inside UtilitiesTest due to junit limitation.
- */
-public class NotSoNiceCodec implements CompressionCodec {
-
- @Override
- public CompressionOutputStream createOutputStream(OutputStream out)
- throws IOException {
- return null;
- }
-
- @Override
- public CompressionOutputStream createOutputStream(OutputStream out,
- Compressor compressor) throws IOException {
- return null;
- }
-
- @Override
- public Class<? extends Compressor> getCompressorType() {
- return null;
- }
-
- @Override
- public Compressor createCompressor() {
- return null;
- }
-
- @Override
- public CompressionInputStream createInputStream(InputStream in)
- throws IOException {
- return null;
- }
-
- @Override
- public CompressionInputStream createInputStream(InputStream in,
- Decompressor decompressor) throws IOException {
- return null;
- }
-
- @Override
- public Class<? extends Decompressor> getDecompressorType() {
- return null;
- }
-
- @Override
- public Decompressor createDecompressor() {
- return null;
- }
-
- @Override
- public String getDefaultExtension() {
- return null;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-hdfs/src/test/java/com/pivotal/pxf/plugins/hdfs/utilities/RecordkeyAdapterTest.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/test/java/com/pivotal/pxf/plugins/hdfs/utilities/RecordkeyAdapterTest.java b/pxf/pxf-hdfs/src/test/java/com/pivotal/pxf/plugins/hdfs/utilities/RecordkeyAdapterTest.java
deleted file mode 100644
index b0c9a3a..0000000
--- a/pxf/pxf-hdfs/src/test/java/com/pivotal/pxf/plugins/hdfs/utilities/RecordkeyAdapterTest.java
+++ /dev/null
@@ -1,154 +0,0 @@
-package com.pivotal.pxf.plugins.hdfs.utilities;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.*;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mockito;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest({RecordkeyAdapter.class, LogFactory.class})
-public class RecordkeyAdapterTest {
- Log Log;
- RecordkeyAdapter recordkeyAdapter;
-
- /**
- * Test convertKeyValue for Integer type
- */
- @Test
- public void convertKeyValueInteger() {
- int key = 13;
- initRecordkeyAdapter();
- runConvertKeyValue(key, new IntWritable(key));
- }
-
- /**
- * Test convertKeyValue for Boolean type
- */
- @Test
- public void convertKeyValueBoolean() {
- boolean key = true;
- initRecordkeyAdapter();
- runConvertKeyValue(key, new BooleanWritable(key));
- }
-
- /**
- * Test convertKeyValue for Byte type
- */
- @Test
- public void convertKeyValueByte() {
- byte key = 1;
- initRecordkeyAdapter();
- runConvertKeyValue(key, new ByteWritable(key));
- }
-
- /**
- * Test convertKeyValue for Double type
- */
- @Test
- public void convertKeyValueDouble() {
- double key = 2.3;
- initRecordkeyAdapter();
- runConvertKeyValue(key, new DoubleWritable(key));
- }
-
- /**
- * Test convertKeyValue for Float type
- */
- @Test
- public void convertKeyValueFloat() {
- float key = (float) 2.3;
- initRecordkeyAdapter();
- runConvertKeyValue(key, new FloatWritable(key));
- }
-
- /**
- * Test convertKeyValue for Long type
- */
- @Test
- public void convertKeyValueLong() {
- long key = 12345678901234567l;
- initRecordkeyAdapter();
- runConvertKeyValue(key, new LongWritable(key));
- }
-
- /**
- * Test convertKeyValue for String type
- */
- @Test
- public void convertKeyValueString() {
- String key = "key";
- initRecordkeyAdapter();
- runConvertKeyValue(key, new Text(key));
- }
-
- /**
- * Test convertKeyValue for several calls of the same type
- */
- @Test
- public void convertKeyValueManyCalls() {
- Boolean key = true;
- mockLog();
- initRecordkeyAdapter();
- runConvertKeyValue(key, new BooleanWritable(key));
- verifyLog("converter initialized for type " + key.getClass() +
- " (key value: " + key + ")");
-
- for (int i = 0; i < 5; ++i) {
- key = (i % 2) == 0;
- runConvertKeyValue(key, new BooleanWritable(key));
- }
- verifyLogOnlyOnce();
- }
-
- /**
- * Test convertKeyValue for boolean type and then string type - negative
- * test
- */
- @Test
- public void convertKeyValueBadSecondValue() {
- boolean key = true;
- initRecordkeyAdapter();
- runConvertKeyValue(key, new BooleanWritable(key));
- String badKey = "bad";
- try {
- recordkeyAdapter.convertKeyValue(badKey);
- fail("conversion of string to boolean should fail");
- } catch (ClassCastException e) {
- assertEquals(e.getMessage(),
- "java.lang.String cannot be cast to java.lang.Boolean");
- }
- }
-
- private void initRecordkeyAdapter() {
- recordkeyAdapter = new RecordkeyAdapter();
- }
-
- private void runConvertKeyValue(Object key, Writable expected) {
- Writable writable = recordkeyAdapter.convertKeyValue(key);
- assertEquals(writable, expected);
- }
-
- private void mockLog() {
- PowerMockito.mockStatic(LogFactory.class);
- Log = mock(Log.class);
- when(LogFactory.getLog(RecordkeyAdapter.class)).thenReturn(Log);
- }
-
- private void verifyLog(String msg) {
- Mockito.verify(Log).debug(msg);
- }
-
- private void verifyLogOnlyOnce() {
- Mockito.verify(Log, Mockito.times(1)).debug(Mockito.any());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-hdfs/src/test/java/org/apache/hawq/pxf/plugins/hdfs/ChunkReaderTest.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/test/java/org/apache/hawq/pxf/plugins/hdfs/ChunkReaderTest.java b/pxf/pxf-hdfs/src/test/java/org/apache/hawq/pxf/plugins/hdfs/ChunkReaderTest.java
new file mode 100644
index 0000000..6b30e68
--- /dev/null
+++ b/pxf/pxf-hdfs/src/test/java/org/apache/hawq/pxf/plugins/hdfs/ChunkReaderTest.java
@@ -0,0 +1,119 @@
+package org.apache.hawq.pxf.plugins.hdfs;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.hdfs.DFSInputStream;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.mockito.stubbing.*;
+import org.mockito.invocation.*;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import static org.mockito.Matchers.any;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+/**
+ * Tester for the ChunkReader class
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({ChunkReader.class})
+public class ChunkReaderTest {
+
+ ChunkReader reader;
+ /* Mocking the stream class that accesses the actual data */
+ DFSInputStream mockStream;
+
+ /*
+ * setUp function called before each test.
+ */
+ @Before
+ public void setUp() throws Exception {
+ mockStream = mock(DFSInputStream.class);
+ }
+
+ /*
+ * Simulate the empty file case
+ */
+ @Test
+ public void readEmptyFile() throws Exception {
+ reader = new ChunkReader(mockStream);
+ when( mockStream.read( (byte [])Mockito.anyObject()) ).thenReturn(0);
+
+ Writable out = new ChunkWritable();
+ int maxBytesToConsume = 1024*1024;
+ assertEquals(0, reader.readLine(out, maxBytesToConsume));
+ }
+
+ /*
+ * Read one line
+ */
+ @Test
+ public void readOneLine() throws Exception {
+ reader = new ChunkReader(mockStream);
+ when( mockStream.read( (byte [])Mockito.anyObject()) ).thenAnswer(new Answer<java.lang.Number>() {
+ @Override
+ public java.lang.Number answer(InvocationOnMock invocation) throws Throwable {
+ byte[] buf = (byte[]) invocation.getArguments()[0];
+
+ byte [] source = "OneLine\nTwoLine\n".getBytes();
+ System.arraycopy(source, 0, buf, 0, source.length);
+ return new java.lang.Byte(buf[0]);
+ }
+ });
+
+ ChunkWritable out = new ChunkWritable();
+ int maxBytesToConsume = 1024*1024;
+ // read first line
+ assertEquals("OneLine\n".length()
+ , reader.readLine(out, maxBytesToConsume) );
+ assertEquals("OneLine\n", new String(out.box) );
+
+ // read second line
+ assertEquals("TwoLine\n".length(), reader.readLine(out, maxBytesToConsume) );
+ assertEquals("TwoLine\n", new String(out.box) );
+ }
+
+ /*
+ * Read one line
+ */
+ @Test
+ public void readChunk() throws Exception {
+ reader = new ChunkReader(mockStream);
+ when( mockStream.read( (byte [])Mockito.anyObject()) ).thenAnswer(new Answer<java.lang.Number>() {
+ @Override
+ public java.lang.Number answer(InvocationOnMock invocation) throws Throwable {
+ byte[] buf = (byte[]) invocation.getArguments()[0];
+
+ byte [] source = "OneLine\nTwoLine\n".getBytes();
+ System.arraycopy(source, 0, buf, 0, source.length);
+ return new java.lang.Integer(source.length);
+ }
+ });
+
+ ChunkWritable out = new ChunkWritable();
+ int maxBytesToConsume = 10; /* make readChunk return after reading the first "chunk": OneLine\nTwoLine\n */
+ // read chunk
+ assertEquals("OneLine\nTwoLine\n".length()
+ , reader.readChunk(out, maxBytesToConsume) );
+ assertEquals("OneLine\nTwoLine\n", new String(out.box) );
+ }
+
+}
+
+
+
+
+
+
+
+
+
+
+
+
+
+
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-hdfs/src/test/java/org/apache/hawq/pxf/plugins/hdfs/SequenceFileAccessorTest.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/test/java/org/apache/hawq/pxf/plugins/hdfs/SequenceFileAccessorTest.java b/pxf/pxf-hdfs/src/test/java/org/apache/hawq/pxf/plugins/hdfs/SequenceFileAccessorTest.java
new file mode 100644
index 0000000..d2e43ad
--- /dev/null
+++ b/pxf/pxf-hdfs/src/test/java/org/apache/hawq/pxf/plugins/hdfs/SequenceFileAccessorTest.java
@@ -0,0 +1,191 @@
+package org.apache.hawq.pxf.plugins.hdfs;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.servlet.ServletContext;
+
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.compress.BZip2Codec;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.core.classloader.annotations.SuppressStaticInitializationFor;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import static org.mockito.Matchers.any;
+import static org.junit.Assert.*;
+
+import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.hawq.pxf.plugins.hdfs.utilities.HdfsUtilities;
+
+import static org.mockito.Mockito.*;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({SequenceFileAccessor.class, HdfsSplittableDataAccessor.class, HdfsUtilities.class})
+@SuppressStaticInitializationFor({"org.apache.hadoop.mapred.JobConf","org.apache.hadoop.fs.FileContext"}) // Prevents static inits
+public class SequenceFileAccessorTest {
+
+ InputData inputData;
+ SequenceFileAccessor accessor;
+ Map<String, String> parameters;
+ ServletContext mockContext;
+ Configuration hdfsConfiguration;
+ SequenceFileInputFormat<?,?> inFormat;
+ JobConf jobConf = null;
+ Path file;
+ FileSystem fs;
+ FileContext fc;
+ Log mockLog;
+
+ /*
+ * setUp function called before each test.
+ *
+ * As the focus of the test is compression codec and type behavior, and
+ * since the compression methods are private to SequenceFileAccessor, we
+ * test their invocation and results by calling the public openForWrite().
+ * Since openForWrite does some filesystem operations on HDFS, those had
+ * to be mocked (and provided good material for a new Kafka story).
+ */
+ @Before
+ public void setUp() throws Exception {
+
+ mockContext = mock(ServletContext.class);
+ inFormat = mock(SequenceFileInputFormat.class);
+ hdfsConfiguration = mock(Configuration.class);
+ jobConf = mock(JobConf.class);
+ file = mock(Path.class);
+ fs = mock(FileSystem.class);
+ fc = mock(FileContext.class);
+ inputData = mock(InputData.class);
+
+ PowerMockito.mockStatic(FileContext.class);
+ PowerMockito.mockStatic(HdfsUtilities.class);
+ PowerMockito.whenNew(Path.class).withArguments(Mockito.anyString()).thenReturn(file);
+
+ when(file.getFileSystem(any(Configuration.class))).thenReturn(fs);
+ when(inputData.getDataSource()).thenReturn("deep.throat");
+ when(inputData.getSegmentId()).thenReturn(0);
+ when(FileContext.getFileContext()).thenReturn(fc);
+ }
+
+ /*
+ * After each test is done, close the accessor if it was created
+ */
+ @After
+ public void tearDown() throws Exception {
+
+ if (accessor == null) {
+ return;
+ }
+
+ accessor.closeForWrite();
+ accessor = null;
+ }
+
+ private void constructAccessor() throws Exception {
+
+ accessor = new SequenceFileAccessor(inputData);
+ accessor.openForWrite();
+ }
+
+ private void mockCompressionOptions(String codec, String type)
+ {
+ when(inputData.getUserProperty("COMPRESSION_CODEC")).thenReturn(codec);
+ when(inputData.getUserProperty("COMPRESSION_TYPE")).thenReturn(type);
+ }
+
+ @Test
+ public void compressionNotSpecified() throws Exception {
+
+ mockCompressionOptions(null, null);
+ constructAccessor();
+ assertEquals(SequenceFile.CompressionType.NONE, accessor.getCompressionType());
+ assertEquals(null, accessor.getCodec());
+ }
+
+ @Test
+ public void compressCodec() throws Exception {
+
+ //using BZip2 as a valid example
+ when(HdfsUtilities.getCodec((Configuration)Mockito.anyObject(), Mockito.anyString())).thenReturn(new BZip2Codec());
+ mockCompressionOptions("org.apache.hadoop.io.compress.BZip2Codec", null);
+ constructAccessor();
+ assertEquals(".bz2", accessor.getCodec().getDefaultExtension());
+ }
+
+ @Test
+ public void bogusCompressCodec() {
+
+ final String codecName = "So I asked, who is he? He goes by the name of Wayne Rooney";
+ when(HdfsUtilities.getCodec((Configuration)Mockito.anyObject(), Mockito.anyString())).thenThrow(new IllegalArgumentException("Compression codec " + codecName + " was not found."));
+ mockCompressionOptions(codecName, null);
+
+ try {
+ constructAccessor();
+ fail("should throw no codec found exception");
+ } catch (Exception e) {
+ assertEquals("Compression codec " + codecName + " was not found.", e.getMessage());
+ }
+ }
+
+ @Test
+ public void compressTypes() throws Exception {
+
+ when(HdfsUtilities.getCodec((Configuration)Mockito.anyObject(), Mockito.anyString())).thenReturn(new BZip2Codec());
+
+ //proper value
+ mockCompressionOptions("org.apache.hadoop.io.compress.BZip2Codec", "BLOCK");
+ constructAccessor();
+ assertEquals(".bz2", accessor.getCodec().getDefaultExtension());
+ assertEquals(org.apache.hadoop.io.SequenceFile.CompressionType.BLOCK, accessor.getCompressionType());
+
+ //case (non) sensitivity
+ mockCompressionOptions("org.apache.hadoop.io.compress.BZip2Codec", "ReCoRd");
+ constructAccessor();
+ assertEquals(".bz2", accessor.getCodec().getDefaultExtension());
+ assertEquals(org.apache.hadoop.io.SequenceFile.CompressionType.RECORD, accessor.getCompressionType());
+
+ //default (RECORD)
+ mockCompressionOptions("org.apache.hadoop.io.compress.BZip2Codec", null);
+ constructAccessor();
+ assertEquals(".bz2", accessor.getCodec().getDefaultExtension());
+ assertEquals(org.apache.hadoop.io.SequenceFile.CompressionType.RECORD, accessor.getCompressionType());
+ }
+
+ @Test
+ public void illegalCompressTypes() throws Exception {
+
+ when(HdfsUtilities.getCodec((Configuration)Mockito.anyObject(), Mockito.anyString())).thenReturn(new BZip2Codec());
+ mockCompressionOptions("org.apache.hadoop.io.compress.BZip2Codec", "Oy");
+
+ try {
+ constructAccessor();
+ fail("illegal COMPRESSION_TYPE should throw IllegalArgumentException");
+ } catch (IllegalArgumentException e) {
+ assertEquals("Illegal compression type 'Oy'", e.getMessage());
+ }
+
+ mockCompressionOptions("org.apache.hadoop.io.compress.BZip2Codec", "NONE");
+
+ try {
+ constructAccessor();
+ fail("illegal COMPRESSION_TYPE should throw IllegalArgumentException");
+ } catch (IllegalArgumentException e) {
+ assertEquals("Illegal compression type 'NONE'. For disabling compression remove COMPRESSION_CODEC parameter.", e.getMessage());
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-hdfs/src/test/java/org/apache/hawq/pxf/plugins/hdfs/StringPassResolverTest.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/test/java/org/apache/hawq/pxf/plugins/hdfs/StringPassResolverTest.java b/pxf/pxf-hdfs/src/test/java/org/apache/hawq/pxf/plugins/hdfs/StringPassResolverTest.java
new file mode 100644
index 0000000..af713c7
--- /dev/null
+++ b/pxf/pxf-hdfs/src/test/java/org/apache/hawq/pxf/plugins/hdfs/StringPassResolverTest.java
@@ -0,0 +1,149 @@
+package org.apache.hawq.pxf.plugins.hdfs;
+
+import org.apache.hawq.pxf.api.OneField;
+import org.apache.hawq.pxf.api.OneRow;
+import org.apache.hawq.pxf.api.OutputFormat;
+import org.apache.hawq.pxf.service.BridgeInputBuilder;
+import org.apache.hawq.pxf.service.io.Text;
+import org.apache.hawq.pxf.service.utilities.ProtocolData;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.*;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.*;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({Text.class, BridgeInputBuilder.class, ProtocolData.class, LogFactory.class})
+public class StringPassResolverTest {
+ ProtocolData mockProtocolData;
+ Log mockLog;
+
+ @Test
+ /*
+ * Test the setFields method: small \n terminated input
+ */
+ public void testSetFields() throws Exception {
+ StringPassResolver resolver = buildResolver();
+
+ byte[] data = new byte[]{(int) 'a', (int) 'b', (int) 'c', (int) 'd', (int) '\n',
+ (int) 'n', (int) 'o', (int) '\n'};
+
+ DataInputStream inputStream = new DataInputStream(new ByteArrayInputStream(data));
+ BridgeInputBuilder inputBuilder = new BridgeInputBuilder(mockProtocolData);
+ List<OneField> record = inputBuilder.makeInput(inputStream);
+
+ OneRow oneRow = resolver.setFields(record);
+ verifyOneRow(oneRow, Arrays.copyOfRange(data, 0, 5));
+
+ record = inputBuilder.makeInput(inputStream);
+ oneRow = resolver.setFields(record);
+ verifyOneRow(oneRow, Arrays.copyOfRange(data, 5, 8));
+ }
+
+ @Test
+ /*
+ * Test the setFields method: input > buffer size, \n terminated
+ */
+ public void testSetFieldsBigArray() throws Exception {
+
+ StringPassResolver resolver = buildResolver();
+
+ byte[] bigArray = new byte[2000];
+ for (int i = 0; i < 1999; ++i) {
+ bigArray[i] = (byte) (i % 10 + 30);
+ }
+ bigArray[1999] = (byte) '\n';
+
+ DataInputStream inputStream = new DataInputStream(new ByteArrayInputStream(bigArray));
+ BridgeInputBuilder inputBuilder = new BridgeInputBuilder(mockProtocolData);
+ List<OneField> record = inputBuilder.makeInput(inputStream);
+
+ OneRow oneRow = resolver.setFields(record);
+
+ verifyOneRow(oneRow, bigArray);
+ }
+
+ @Test
+ /*
+ * Test the setFields method: input > buffer size, no \n
+ */
+ public void testSetFieldsBigArrayNoNewLine() throws Exception {
+
+ PowerMockito.mockStatic(LogFactory.class);
+ mockLog = mock(Log.class);
+ PowerMockito.when(LogFactory.getLog(any(Class.class))).thenReturn(mockLog);
+
+ StringPassResolver resolver = buildResolver();
+
+ byte[] bigArray = new byte[2000];
+ for (int i = 0; i < 2000; ++i) {
+ bigArray[i] = (byte) (i % 10 + 60);
+ }
+
+ DataInputStream inputStream = new DataInputStream(new ByteArrayInputStream(bigArray));
+ BridgeInputBuilder inputBuilder = new BridgeInputBuilder(mockProtocolData);
+ List<OneField> record = inputBuilder.makeInput(inputStream);
+
+ OneRow oneRow = resolver.setFields(record);
+
+ verifyOneRow(oneRow, bigArray);
+
+ //verify(mockLog, atLeastOnce()).info(anyString());
+ //Mockito.verify(mockLog).warn("Stream ended without line breaksdfljsldkj");
+ //verifyWarning();
+ }
+
+ @Test
+ /*
+ * Test the setFields method: empty stream (returns -1)
+ */
+ public void testSetFieldsEmptyStream() throws Exception {
+
+ StringPassResolver resolver = buildResolver();
+
+ byte[] empty = new byte[0];
+
+ DataInputStream inputStream = new DataInputStream(new ByteArrayInputStream(empty));
+ BridgeInputBuilder inputBuilder = new BridgeInputBuilder(mockProtocolData);
+ List<OneField> record = inputBuilder.makeInput(inputStream);
+
+ OneRow oneRow = resolver.setFields(record);
+
+ assertNull(oneRow);
+ }
+
+ /*
+ * helpers functions
+ */
+ private StringPassResolver buildResolver()
+ throws Exception {
+
+ mockProtocolData = mock(ProtocolData.class);
+ PowerMockito.when(mockProtocolData.outputFormat()).thenReturn(OutputFormat.TEXT);
+
+ return new StringPassResolver(mockProtocolData);
+ }
+
+ private void verifyOneRow(OneRow oneRow, byte[] expected) {
+ assertNull(oneRow.getKey());
+ byte[] bytes = (byte[]) oneRow.getData();
+ byte[] result = Arrays.copyOfRange(bytes, 0, bytes.length);
+ assertEquals(result.length, expected.length);
+ assertTrue(Arrays.equals(result, expected));
+ }
+
+// private void verifyWarning() {
+// Mockito.verify(Log).warn("Stream ended without line break");
+// }
+}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-hdfs/src/test/java/org/apache/hawq/pxf/plugins/hdfs/utilities/HdfsUtilitiesTest.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/test/java/org/apache/hawq/pxf/plugins/hdfs/utilities/HdfsUtilitiesTest.java b/pxf/pxf-hdfs/src/test/java/org/apache/hawq/pxf/plugins/hdfs/utilities/HdfsUtilitiesTest.java
new file mode 100644
index 0000000..ebdc495
--- /dev/null
+++ b/pxf/pxf-hdfs/src/test/java/org/apache/hawq/pxf/plugins/hdfs/utilities/HdfsUtilitiesTest.java
@@ -0,0 +1,181 @@
+package org.apache.hawq.pxf.plugins.hdfs.utilities;
+
+import org.apache.hawq.pxf.api.OneField;
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.*;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.core.classloader.annotations.SuppressStaticInitializationFor;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.powermock.reflect.Whitebox;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+@RunWith(PowerMockRunner.class)
+@SuppressStaticInitializationFor("org.apache.hawq.pxf.plugins.hdfs.utilities.HdfsUtilities")
+@PrepareForTest({HdfsUtilities.class, ReflectionUtils.class})
+public class HdfsUtilitiesTest {
+
+ Configuration conf;
+ CompressionCodecFactory factory;
+ Log Log;
+
+ @Before
+ public void SetupCompressionFactory() {
+ factory = mock(CompressionCodecFactory.class);
+ Whitebox.setInternalState(HdfsUtilities.class, factory);
+ Log = mock(Log.class);
+ Whitebox.setInternalState(HdfsUtilities.class, Log);
+ }
+
+ @Test
+ public void getCodecNoName() {
+
+ Configuration conf = new Configuration();
+ String name = "some.bad.codec";
+
+ try {
+ HdfsUtilities.getCodec(conf, name);
+ fail("function should fail with bad codec name " + name);
+ } catch (IllegalArgumentException e) {
+ assertEquals(e.getMessage(), "Compression codec " + name + " was not found.");
+ }
+ }
+
+ @Test
+ public void getCodecNoConf() {
+
+ Configuration conf = null;
+ String name = "org.apache.hadoop.io.compress.GzipCodec";
+
+ try {
+ HdfsUtilities.getCodec(conf, name);
+ fail("function should fail with when conf is null");
+ } catch (NullPointerException e) {
+ assertTrue(true);
+ }
+ }
+
+ @Test
+ public void getCodecGzip() {
+
+ Configuration conf = new Configuration();
+ String name = "org.apache.hadoop.io.compress.GzipCodec";
+
+ PowerMockito.mockStatic(ReflectionUtils.class);
+ GzipCodec gzipCodec = mock(GzipCodec.class);
+
+ when(ReflectionUtils.newInstance(GzipCodec.class, conf)).thenReturn(gzipCodec);
+
+ CompressionCodec codec = HdfsUtilities.getCodec(conf, name);
+ assertNotNull(codec);
+ assertEquals(codec, gzipCodec);
+ }
+
+ @Test
+ public void isThreadSafe() {
+
+ testIsThreadSafe(
+ "readable compression, no compression - thread safe",
+ "/some/path/without.compression",
+ null, null,
+ true);
+
+ testIsThreadSafe(
+ "readable compression, gzip compression - thread safe",
+ "/some/compressed/path.gz",
+ null, new GzipCodec(),
+ true);
+
+ testIsThreadSafe(
+ "readable compression, bzip2 compression - not thread safe",
+ "/some/path/with/bzip2.bz2",
+ null, new BZip2Codec(),
+ false);
+
+ testIsThreadSafe(
+ "writable compression, no compression codec - thread safe",
+ "/some/path",
+ null, null,
+ true);
+
+ testIsThreadSafe(
+ "writable compression, some compression codec - thread safe",
+ "/some/path",
+ "I.am.a.nice.codec", new NotSoNiceCodec(),
+ true);
+
+ testIsThreadSafe(
+ "writable compression, compression codec bzip2 - not thread safe",
+ "/some/path",
+ "org.apache.hadoop.io.compress.BZip2Codec", new BZip2Codec(),
+ false);
+ }
+
+ private void testIsThreadSafe(String testDescription, String path, String codecStr, CompressionCodec codec, boolean expectedResult) {
+ prepareDataForIsThreadSafe(path, codecStr, codec);
+
+ boolean result = HdfsUtilities.isThreadSafe(path, codecStr);
+ assertTrue(testDescription, result == expectedResult);
+ }
+
+ private void prepareDataForIsThreadSafe(String dataDir, String codecStr, CompressionCodec codec) {
+ try {
+ conf = PowerMockito.mock(Configuration.class);
+ PowerMockito.whenNew(Configuration.class).withNoArguments().thenReturn(conf);
+ } catch (Exception e) {
+ fail("new Configuration mocking failed");
+ }
+
+ if (codecStr == null) {
+ when(factory.getCodec(new Path(dataDir))).thenReturn(codec);
+ } else {
+ PowerMockito.stub(PowerMockito.method(HdfsUtilities.class, "getCodecClass")).toReturn(codec.getClass());
+ }
+ }
+
+ @Test
+ public void isSplittableCodec() {
+
+ testIsSplittableCodec("no codec - splittable",
+ "some/innocent.file", null, true);
+ testIsSplittableCodec("gzip codec - not splittable",
+ "/gzip.gz", new GzipCodec(), false);
+ testIsSplittableCodec("default codec - not splittable",
+ "/default.deflate", new DefaultCodec(), false);
+ testIsSplittableCodec("bzip2 codec - splittable",
+ "bzip2.bz2", new BZip2Codec(), true);
+ }
+
+ private void testIsSplittableCodec(String description,
+ String pathName, CompressionCodec codec, boolean expected) {
+ Path path = new Path(pathName);
+ when(factory.getCodec(path)).thenReturn(codec);
+
+ boolean result = HdfsUtilities.isSplittableCodec(path);
+ assertEquals(description, result, expected);
+ }
+
+ @Test
+ public void testToString() {
+ List<OneField> oneFields = Arrays.asList(new OneField(1, "uno"), new OneField(2, "dos"), new OneField(3, "tres"));
+
+ assertEquals("uno!dos!tres", HdfsUtilities.toString(oneFields, "!"));
+
+ assertEquals("uno", HdfsUtilities.toString(Collections.singletonList(oneFields.get(0)), "!"));
+
+ assertEquals("", HdfsUtilities.toString(Collections.<OneField>emptyList(), "!"));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-hdfs/src/test/java/org/apache/hawq/pxf/plugins/hdfs/utilities/NotSoNiceCodec.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/test/java/org/apache/hawq/pxf/plugins/hdfs/utilities/NotSoNiceCodec.java b/pxf/pxf-hdfs/src/test/java/org/apache/hawq/pxf/plugins/hdfs/utilities/NotSoNiceCodec.java
new file mode 100644
index 0000000..671f5d8
--- /dev/null
+++ b/pxf/pxf-hdfs/src/test/java/org/apache/hawq/pxf/plugins/hdfs/utilities/NotSoNiceCodec.java
@@ -0,0 +1,64 @@
+package org.apache.hawq.pxf.plugins.hdfs.utilities;
+
+import org.apache.hadoop.io.compress.*;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/**
+ * Codec class for UtilitiesTest
+ * Can't be embedded inside UtilitiesTest due to junit limitation.
+ */
+public class NotSoNiceCodec implements CompressionCodec {
+
+ @Override
+ public CompressionOutputStream createOutputStream(OutputStream out)
+ throws IOException {
+ return null;
+ }
+
+ @Override
+ public CompressionOutputStream createOutputStream(OutputStream out,
+ Compressor compressor) throws IOException {
+ return null;
+ }
+
+ @Override
+ public Class<? extends Compressor> getCompressorType() {
+ return null;
+ }
+
+ @Override
+ public Compressor createCompressor() {
+ return null;
+ }
+
+ @Override
+ public CompressionInputStream createInputStream(InputStream in)
+ throws IOException {
+ return null;
+ }
+
+ @Override
+ public CompressionInputStream createInputStream(InputStream in,
+ Decompressor decompressor) throws IOException {
+ return null;
+ }
+
+ @Override
+ public Class<? extends Decompressor> getDecompressorType() {
+ return null;
+ }
+
+ @Override
+ public Decompressor createDecompressor() {
+ return null;
+ }
+
+ @Override
+ public String getDefaultExtension() {
+ return null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-hdfs/src/test/java/org/apache/hawq/pxf/plugins/hdfs/utilities/RecordkeyAdapterTest.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/test/java/org/apache/hawq/pxf/plugins/hdfs/utilities/RecordkeyAdapterTest.java b/pxf/pxf-hdfs/src/test/java/org/apache/hawq/pxf/plugins/hdfs/utilities/RecordkeyAdapterTest.java
new file mode 100644
index 0000000..1f68b09
--- /dev/null
+++ b/pxf/pxf-hdfs/src/test/java/org/apache/hawq/pxf/plugins/hdfs/utilities/RecordkeyAdapterTest.java
@@ -0,0 +1,154 @@
+package org.apache.hawq.pxf.plugins.hdfs.utilities;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.*;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({RecordkeyAdapter.class, LogFactory.class})
+public class RecordkeyAdapterTest {
+ Log Log;
+ RecordkeyAdapter recordkeyAdapter;
+
+ /**
+ * Test convertKeyValue for Integer type
+ */
+ @Test
+ public void convertKeyValueInteger() {
+ int key = 13;
+ initRecordkeyAdapter();
+ runConvertKeyValue(key, new IntWritable(key));
+ }
+
+ /**
+ * Test convertKeyValue for Boolean type
+ */
+ @Test
+ public void convertKeyValueBoolean() {
+ boolean key = true;
+ initRecordkeyAdapter();
+ runConvertKeyValue(key, new BooleanWritable(key));
+ }
+
+ /**
+ * Test convertKeyValue for Byte type
+ */
+ @Test
+ public void convertKeyValueByte() {
+ byte key = 1;
+ initRecordkeyAdapter();
+ runConvertKeyValue(key, new ByteWritable(key));
+ }
+
+ /**
+ * Test convertKeyValue for Double type
+ */
+ @Test
+ public void convertKeyValueDouble() {
+ double key = 2.3;
+ initRecordkeyAdapter();
+ runConvertKeyValue(key, new DoubleWritable(key));
+ }
+
+ /**
+ * Test convertKeyValue for Float type
+ */
+ @Test
+ public void convertKeyValueFloat() {
+ float key = (float) 2.3;
+ initRecordkeyAdapter();
+ runConvertKeyValue(key, new FloatWritable(key));
+ }
+
+ /**
+ * Test convertKeyValue for Long type
+ */
+ @Test
+ public void convertKeyValueLong() {
+ long key = 12345678901234567l;
+ initRecordkeyAdapter();
+ runConvertKeyValue(key, new LongWritable(key));
+ }
+
+ /**
+ * Test convertKeyValue for String type
+ */
+ @Test
+ public void convertKeyValueString() {
+ String key = "key";
+ initRecordkeyAdapter();
+ runConvertKeyValue(key, new Text(key));
+ }
+
+ /**
+ * Test convertKeyValue for several calls of the same type
+ */
+ @Test
+ public void convertKeyValueManyCalls() {
+ Boolean key = true;
+ mockLog();
+ initRecordkeyAdapter();
+ runConvertKeyValue(key, new BooleanWritable(key));
+ verifyLog("converter initialized for type " + key.getClass() +
+ " (key value: " + key + ")");
+
+ for (int i = 0; i < 5; ++i) {
+ key = (i % 2) == 0;
+ runConvertKeyValue(key, new BooleanWritable(key));
+ }
+ verifyLogOnlyOnce();
+ }
+
+ /**
+ * Test convertKeyValue for boolean type and then string type - negative
+ * test
+ */
+ @Test
+ public void convertKeyValueBadSecondValue() {
+ boolean key = true;
+ initRecordkeyAdapter();
+ runConvertKeyValue(key, new BooleanWritable(key));
+ String badKey = "bad";
+ try {
+ recordkeyAdapter.convertKeyValue(badKey);
+ fail("conversion of string to boolean should fail");
+ } catch (ClassCastException e) {
+ assertEquals(e.getMessage(),
+ "java.lang.String cannot be cast to java.lang.Boolean");
+ }
+ }
+
+ private void initRecordkeyAdapter() {
+ recordkeyAdapter = new RecordkeyAdapter();
+ }
+
+ private void runConvertKeyValue(Object key, Writable expected) {
+ Writable writable = recordkeyAdapter.convertKeyValue(key);
+ assertEquals(writable, expected);
+ }
+
+ private void mockLog() {
+ PowerMockito.mockStatic(LogFactory.class);
+ Log = mock(Log.class);
+ when(LogFactory.getLog(RecordkeyAdapter.class)).thenReturn(Log);
+ }
+
+ private void verifyLog(String msg) {
+ Mockito.verify(Log).debug(msg);
+ }
+
+ private void verifyLogOnlyOnce() {
+ Mockito.verify(Log, Mockito.times(1)).debug(Mockito.any());
+ }
+}