You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2017/05/23 01:24:02 UTC
[04/13] apex-malhar git commit: Changed package path for files to be
included under malhar. Modifications to build files for project to build
under malhar.
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/main/java/org/apache/apex/malhar/flume/storage/ErrorMaskingEventCodec.java
----------------------------------------------------------------------
diff --git a/flume/src/main/java/org/apache/apex/malhar/flume/storage/ErrorMaskingEventCodec.java b/flume/src/main/java/org/apache/apex/malhar/flume/storage/ErrorMaskingEventCodec.java
new file mode 100644
index 0000000..b8d2725
--- /dev/null
+++ b/flume/src/main/java/org/apache/apex/malhar/flume/storage/ErrorMaskingEventCodec.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.flume.storage;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.flume.Event;
+
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * <p>ErrorMaskingEventCodec class.</p>
+ *
+ * @since 1.0.4
+ */
+public class ErrorMaskingEventCodec extends EventCodec
+{
+
+ @Override
+ public Object fromByteArray(Slice fragment)
+ {
+ try {
+ return super.fromByteArray(fragment);
+ } catch (RuntimeException re) {
+ logger.warn("Cannot deserialize event {}", fragment, re);
+ }
+
+ return null;
+ }
+
+ @Override
+ public Slice toByteArray(Event event)
+ {
+ try {
+ return super.toByteArray(event);
+ } catch (RuntimeException re) {
+ logger.warn("Cannot serialize event {}", event, re);
+ }
+
+ return null;
+ }
+
+
+ private static final Logger logger = LoggerFactory.getLogger(ErrorMaskingEventCodec.class);
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/main/java/org/apache/apex/malhar/flume/storage/EventCodec.java
----------------------------------------------------------------------
diff --git a/flume/src/main/java/org/apache/apex/malhar/flume/storage/EventCodec.java b/flume/src/main/java/org/apache/apex/malhar/flume/storage/EventCodec.java
new file mode 100644
index 0000000..463551e
--- /dev/null
+++ b/flume/src/main/java/org/apache/apex/malhar/flume/storage/EventCodec.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.flume.storage;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.flume.Event;
+import org.apache.flume.event.EventBuilder;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+import com.datatorrent.api.StreamCodec;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * <p>EventCodec class.</p>
+ *
+ * @since 0.9.4
+ */
+public class EventCodec implements StreamCodec<Event>
+{
+ private final transient Kryo kryo;
+
+ public EventCodec()
+ {
+ this.kryo = new Kryo();
+ this.kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
+ }
+
+ @Override
+ public Object fromByteArray(Slice fragment)
+ {
+ ByteArrayInputStream is = new ByteArrayInputStream(fragment.buffer, fragment.offset, fragment.length);
+ Input input = new Input(is);
+
+ @SuppressWarnings("unchecked")
+ HashMap<String, String> headers = kryo.readObjectOrNull(input, HashMap.class);
+ byte[] body = kryo.readObjectOrNull(input, byte[].class);
+ return EventBuilder.withBody(body, headers);
+ }
+
+ @Override
+ public Slice toByteArray(Event event)
+ {
+ ByteArrayOutputStream os = new ByteArrayOutputStream();
+ Output output = new Output(os);
+
+ Map<String, String> headers = event.getHeaders();
+ if (headers != null && headers.getClass() != HashMap.class) {
+ HashMap<String, String> tmp = new HashMap<String, String>(headers.size());
+ tmp.putAll(headers);
+ headers = tmp;
+ }
+ kryo.writeObjectOrNull(output, headers, HashMap.class);
+ kryo.writeObjectOrNull(output, event.getBody(), byte[].class);
+ output.flush();
+ final byte[] bytes = os.toByteArray();
+ return new Slice(bytes, 0, bytes.length);
+ }
+
+ @Override
+ public int getPartition(Event o)
+ {
+ return o.hashCode();
+ }
+
+ private static final Logger logger = LoggerFactory.getLogger(EventCodec.class);
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/main/java/org/apache/apex/malhar/flume/storage/HDFSStorage.java
----------------------------------------------------------------------
diff --git a/flume/src/main/java/org/apache/apex/malhar/flume/storage/HDFSStorage.java b/flume/src/main/java/org/apache/apex/malhar/flume/storage/HDFSStorage.java
new file mode 100644
index 0000000..77aeb68
--- /dev/null
+++ b/flume/src/main/java/org/apache/apex/malhar/flume/storage/HDFSStorage.java
@@ -0,0 +1,947 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.flume.storage;
+
+import java.io.DataInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.flume.Context;
+import org.apache.flume.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.primitives.Ints;
+import com.google.common.primitives.Longs;
+
+import com.datatorrent.api.Component;
+import com.datatorrent.common.util.NameableThreadFactory;
+import org.apache.apex.malhar.flume.sink.Server;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * HDFSStorage is developed to store and retrieve the data from HDFS
+ * <p />
+ * The properties that can be set on HDFSStorage are: <br />
+ * baseDir - The base directory where the data is going to be stored <br />
+ * restore - This is used to restore the application from previous failure <br />
+ * blockSize - The maximum size of the each file to created. <br />
+ *
+ * @since 0.9.3
+ */
+public class HDFSStorage implements Storage, Configurable, Component<com.datatorrent.api.Context>
+{
+ public static final int DEFAULT_BLOCK_SIZE = 64 * 1024 * 1024;
+ public static final String BASE_DIR_KEY = "baseDir";
+ public static final String RESTORE_KEY = "restore";
+ public static final String BLOCKSIZE = "blockSize";
+ public static final String BLOCK_SIZE_MULTIPLE = "blockSizeMultiple";
+ public static final String NUMBER_RETRY = "retryCount";
+
+ private static final String OFFSET_SUFFIX = "-offsetFile";
+ private static final String BOOK_KEEPING_FILE_OFFSET = "-bookKeepingOffsetFile";
+ private static final String FLUSHED_IDENTITY_FILE = "flushedCounter";
+ private static final String CLEAN_OFFSET_FILE = "cleanoffsetFile";
+ private static final String FLUSHED_IDENTITY_FILE_TEMP = "flushedCounter.tmp";
+ private static final String CLEAN_OFFSET_FILE_TEMP = "cleanoffsetFile.tmp";
+ private static final int IDENTIFIER_SIZE = 8;
+ private static final int DATA_LENGTH_BYTE_SIZE = 4;
+
+ /**
+ * Number of times the storage will try to get the filesystem
+ */
+ private int retryCount = 3;
+ /**
+ * The multiple of block size
+ */
+ private int blockSizeMultiple = 1;
+ /**
+ * Identifier for this storage.
+ */
+ @NotNull
+ private String id;
+ /**
+ * The baseDir where the storage facility is going to create files.
+ */
+ @NotNull
+ private String baseDir;
+ /**
+ * The block size to be used to create the storage files
+ */
+ private long blockSize;
+ /**
+ *
+ */
+ private boolean restore;
+ /**
+ * This identifies the current file number
+ */
+ private long currentWrittenFile;
+ /**
+ * This identifies the file number that has been flushed
+ */
+ private long flushedFileCounter;
+ /**
+ * The file that stores the fileCounter information
+ */
+ // private Path fileCounterFile;
+ /**
+ * The file that stores the flushed fileCounter information
+ */
+ private Path flushedCounterFile;
+ private Path flushedCounterFileTemp;
+ /**
+ * This identifies the last cleaned file number
+ */
+ private long cleanedFileCounter;
+ /**
+ * The file that stores the clean file counter information
+ */
+ // private Path cleanFileCounterFile;
+ /**
+ * The file that stores the clean file offset information
+ */
+ private Path cleanFileOffsetFile;
+ private Path cleanFileOffsetFileTemp;
+ private FileSystem fs;
+ private FSDataOutputStream dataStream;
+ ArrayList<DataBlock> files2Commit = new ArrayList<DataBlock>();
+ /**
+ * The offset in the current opened file
+ */
+ private long fileWriteOffset;
+ private FSDataInputStream readStream;
+ private long retrievalOffset;
+ private long retrievalFile;
+ private int offset;
+ private long flushedLong;
+ private long flushedFileWriteOffset;
+ private long bookKeepingFileOffset;
+ private byte[] cleanedOffset = new byte[8];
+ private long skipOffset;
+ private long skipFile;
+ private transient Path basePath;
+ private ExecutorService storageExecutor;
+ private byte[] currentData;
+ private FSDataInputStream nextReadStream;
+ private long nextFlushedLong;
+ private long nextRetrievalFile;
+ private byte[] nextRetrievalData;
+
+ public HDFSStorage()
+ {
+ this.restore = true;
+ }
+
+ /**
+ * This stores the Identifier information identified in the last store function call
+ *
+ * @param ctx
+ */
+ @Override
+ public void configure(Context ctx)
+ {
+ String tempId = ctx.getString(ID);
+ if (tempId == null) {
+ if (id == null) {
+ throw new IllegalArgumentException("id can't be null.");
+ }
+ } else {
+ id = tempId;
+ }
+
+ String tempBaseDir = ctx.getString(BASE_DIR_KEY);
+ if (tempBaseDir != null) {
+ baseDir = tempBaseDir;
+ }
+
+ restore = ctx.getBoolean(RESTORE_KEY, restore);
+ Long tempBlockSize = ctx.getLong(BLOCKSIZE);
+ if (tempBlockSize != null) {
+ blockSize = tempBlockSize;
+ }
+ blockSizeMultiple = ctx.getInteger(BLOCK_SIZE_MULTIPLE, blockSizeMultiple);
+ retryCount = ctx.getInteger(NUMBER_RETRY,retryCount);
+ }
+
+ /**
+ * This function reads the file at a location and return the bytes stored in the file "
+ *
+ * @param path - the location of the file
+ * @return
+ * @throws IOException
+ */
+ byte[] readData(Path path) throws IOException
+ {
+ DataInputStream is = new DataInputStream(fs.open(path));
+ byte[] bytes = new byte[is.available()];
+ is.readFully(bytes);
+ is.close();
+ return bytes;
+ }
+
+ /**
+ * This function writes the bytes to a file specified by the path
+ *
+ * @param path the file location
+ * @param data the data to be written to the file
+ * @return
+ * @throws IOException
+ */
+ private FSDataOutputStream writeData(Path path, byte[] data) throws IOException
+ {
+ FSDataOutputStream fsOutputStream;
+ if (fs.getScheme().equals("file")) {
+ // local FS does not support hflush and does not flush native stream
+ fsOutputStream = new FSDataOutputStream(
+ new FileOutputStream(Path.getPathWithoutSchemeAndAuthority(path).toString()), null);
+ } else {
+ fsOutputStream = fs.create(path);
+ }
+ fsOutputStream.write(data);
+ return fsOutputStream;
+ }
+
+ private long calculateOffset(long fileOffset, long fileCounter)
+ {
+ return ((fileCounter << 32) | (fileOffset & 0xffffffffL));
+ }
+
+ @Override
+ public byte[] store(Slice slice)
+ {
+ // logger.debug("store message ");
+ int bytesToWrite = slice.length + DATA_LENGTH_BYTE_SIZE;
+ if (currentWrittenFile < skipFile) {
+ fileWriteOffset += bytesToWrite;
+ if (fileWriteOffset >= bookKeepingFileOffset) {
+ files2Commit.add(new DataBlock(null, bookKeepingFileOffset,
+ new Path(basePath, currentWrittenFile + OFFSET_SUFFIX), currentWrittenFile));
+ currentWrittenFile++;
+ if (fileWriteOffset > bookKeepingFileOffset) {
+ fileWriteOffset = bytesToWrite;
+ } else {
+ fileWriteOffset = 0;
+ }
+ try {
+ bookKeepingFileOffset = getFlushedFileWriteOffset(
+ new Path(basePath, currentWrittenFile + BOOK_KEEPING_FILE_OFFSET));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ return null;
+ }
+
+ if (flushedFileCounter == currentWrittenFile && dataStream == null) {
+ currentWrittenFile++;
+ fileWriteOffset = 0;
+ }
+
+ if (flushedFileCounter == skipFile && skipFile != -1) {
+ skipFile++;
+ }
+
+ if (fileWriteOffset + bytesToWrite < blockSize) {
+ try {
+ /* write length and the actual data to the file */
+ if (fileWriteOffset == 0) {
+ // writeData(flushedCounterFile, String.valueOf(currentWrittenFile).getBytes()).close();
+ dataStream = writeData(new Path(basePath, String.valueOf(currentWrittenFile)),
+ Ints.toByteArray(slice.length));
+ dataStream.write(slice.buffer, slice.offset, slice.length);
+ } else {
+ dataStream.write(Ints.toByteArray(slice.length));
+ dataStream.write(slice.buffer, slice.offset, slice.length);
+ }
+ fileWriteOffset += bytesToWrite;
+
+ byte[] fileOffset = null;
+ if ((currentWrittenFile > skipFile) || (currentWrittenFile == skipFile && fileWriteOffset > skipOffset)) {
+ skipFile = -1;
+ fileOffset = new byte[IDENTIFIER_SIZE];
+ Server.writeLong(fileOffset, 0, calculateOffset(fileWriteOffset, currentWrittenFile));
+ }
+ return fileOffset;
+ } catch (IOException ex) {
+ logger.warn("Error while storing the bytes {}", ex.getMessage());
+ closeFs();
+ throw new RuntimeException(ex);
+ }
+ }
+ DataBlock db = new DataBlock(dataStream, fileWriteOffset,
+ new Path(basePath, currentWrittenFile + OFFSET_SUFFIX), currentWrittenFile);
+ db.close();
+ files2Commit.add(db);
+ fileWriteOffset = 0;
+ ++currentWrittenFile;
+ return store(slice);
+ }
+
+ /**
+ * @param b
+ * @param startIndex
+ * @return
+ */
+ long byteArrayToLong(byte[] b, int startIndex)
+ {
+ final byte b1 = 0;
+ return Longs.fromBytes(b1, b1, b1, b1, b[3 + startIndex], b[2 + startIndex], b[1 + startIndex], b[startIndex]);
+ }
+
+ @Override
+ public byte[] retrieve(byte[] identifier)
+ {
+ skipFile = -1;
+ skipOffset = 0;
+ logger.debug("retrieve with address {}", Arrays.toString(identifier));
+ // flushing the last incomplete flushed file
+ closeUnflushedFiles();
+
+ retrievalOffset = byteArrayToLong(identifier, 0);
+ retrievalFile = byteArrayToLong(identifier, offset);
+
+ if (retrievalFile == 0 && retrievalOffset == 0 && currentWrittenFile == 0 && fileWriteOffset == 0) {
+ skipOffset = 0;
+ return null;
+ }
+
+ // making sure that the deleted address is not requested again
+ if (retrievalFile != 0 || retrievalOffset != 0) {
+ long cleanedFile = byteArrayToLong(cleanedOffset, offset);
+ if (retrievalFile < cleanedFile || (retrievalFile == cleanedFile &&
+ retrievalOffset < byteArrayToLong(cleanedOffset, 0))) {
+ logger.warn("The address asked has been deleted retrievalFile={}, cleanedFile={}, retrievalOffset={}, " +
+ "cleanedOffset={}", retrievalFile, cleanedFile, retrievalOffset, byteArrayToLong(cleanedOffset, 0));
+ closeFs();
+ throw new IllegalArgumentException(String.format("The data for address %s has already been deleted",
+ Arrays.toString(identifier)));
+ }
+ }
+
+ // we have just started
+ if (retrievalFile == 0 && retrievalOffset == 0) {
+ retrievalFile = byteArrayToLong(cleanedOffset, offset);
+ retrievalOffset = byteArrayToLong(cleanedOffset, 0);
+ }
+
+ if ((retrievalFile > flushedFileCounter)) {
+ skipFile = retrievalFile;
+ skipOffset = retrievalOffset;
+ retrievalFile = -1;
+ return null;
+ }
+ if ((retrievalFile == flushedFileCounter && retrievalOffset >= flushedFileWriteOffset)) {
+ skipFile = retrievalFile;
+ skipOffset = retrievalOffset - flushedFileWriteOffset;
+ retrievalFile = -1;
+ return null;
+ }
+
+ try {
+ if (readStream != null) {
+ readStream.close();
+ readStream = null;
+ }
+ Path path = new Path(basePath, String.valueOf(retrievalFile));
+ if (!fs.exists(path)) {
+ retrievalFile = -1;
+ closeFs();
+ throw new RuntimeException(String.format("File %s does not exist", path.toString()));
+ }
+
+ byte[] flushedOffset = readData(new Path(basePath, retrievalFile + OFFSET_SUFFIX));
+ flushedLong = Server.readLong(flushedOffset, 0);
+ while (retrievalOffset >= flushedLong && retrievalFile < flushedFileCounter) {
+ retrievalOffset -= flushedLong;
+ retrievalFile++;
+ flushedOffset = readData(new Path(basePath, retrievalFile + OFFSET_SUFFIX));
+ flushedLong = Server.readLong(flushedOffset, 0);
+ }
+
+ if (retrievalOffset >= flushedLong) {
+ logger.warn("data not flushed for the given identifier");
+ retrievalFile = -1;
+ return null;
+ }
+ synchronized (HDFSStorage.this) {
+ if (nextReadStream != null) {
+ nextReadStream.close();
+ nextReadStream = null;
+ }
+ }
+ currentData = null;
+ path = new Path(basePath, String.valueOf(retrievalFile));
+ //readStream = new FSDataInputStream(fs.open(path));
+ currentData = readData(path);
+ //readStream.seek(retrievalOffset);
+ storageExecutor.submit(getNextStream());
+ return retrieveHelper();
+ } catch (IOException e) {
+ closeFs();
+ throw new RuntimeException(e);
+ }
+ }
+
+ private byte[] retrieveHelper() throws IOException
+ {
+ int tempRetrievalOffset = (int)retrievalOffset;
+ int length = Ints.fromBytes(currentData[tempRetrievalOffset], currentData[tempRetrievalOffset + 1],
+ currentData[tempRetrievalOffset + 2], currentData[tempRetrievalOffset + 3]);
+ byte[] data = new byte[length + IDENTIFIER_SIZE];
+ System.arraycopy(currentData, tempRetrievalOffset + 4, data, IDENTIFIER_SIZE, length);
+ retrievalOffset += length + DATA_LENGTH_BYTE_SIZE;
+ if (retrievalOffset >= flushedLong) {
+ Server.writeLong(data, 0, calculateOffset(0, retrievalFile + 1));
+ } else {
+ Server.writeLong(data, 0, calculateOffset(retrievalOffset, retrievalFile));
+ }
+ return data;
+ }
+
+ @Override
+ public byte[] retrieveNext()
+ {
+ if (retrievalFile == -1) {
+ closeFs();
+ throw new RuntimeException("Call retrieve first");
+ }
+
+ if (retrievalFile > flushedFileCounter) {
+ logger.warn("data is not flushed");
+ return null;
+ }
+
+ try {
+ if (currentData == null) {
+ synchronized (HDFSStorage.this) {
+ if (nextRetrievalData != null && (retrievalFile == nextRetrievalFile)) {
+ currentData = nextRetrievalData;
+ flushedLong = nextFlushedLong;
+ nextRetrievalData = null;
+ } else {
+ currentData = null;
+ currentData = readData(new Path(basePath, String.valueOf(retrievalFile)));
+ byte[] flushedOffset = readData(new Path(basePath, retrievalFile + OFFSET_SUFFIX));
+ flushedLong = Server.readLong(flushedOffset, 0);
+ }
+ }
+ storageExecutor.submit(getNextStream());
+ }
+
+ if (retrievalOffset >= flushedLong) {
+ retrievalFile++;
+ retrievalOffset = 0;
+
+ if (retrievalFile > flushedFileCounter) {
+ logger.warn("data is not flushed");
+ return null;
+ }
+
+ //readStream.close();
+ // readStream = new FSDataInputStream(fs.open(new Path(basePath, String.valueOf(retrievalFile))));
+ // byte[] flushedOffset = readData(new Path(basePath, retrievalFile + OFFSET_SUFFIX));
+ // flushedLong = Server.readLong(flushedOffset, 0);
+
+ synchronized (HDFSStorage.this) {
+ if (nextRetrievalData != null && (retrievalFile == nextRetrievalFile)) {
+ currentData = nextRetrievalData;
+ flushedLong = nextFlushedLong;
+ nextRetrievalData = null;
+ } else {
+ currentData = null;
+ currentData = readData(new Path(basePath, String.valueOf(retrievalFile)));
+ byte[] flushedOffset = readData(new Path(basePath, retrievalFile + OFFSET_SUFFIX));
+ flushedLong = Server.readLong(flushedOffset, 0);
+ }
+ }
+ storageExecutor.submit(getNextStream());
+ }
+ //readStream.seek(retrievalOffset);
+ return retrieveHelper();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ @SuppressWarnings("AssignmentToCollectionOrArrayFieldFromParameter")
+ public void clean(byte[] identifier)
+ {
+ logger.info("clean {}", Arrays.toString(identifier));
+ long cleanFileIndex = byteArrayToLong(identifier, offset);
+
+ long cleanFileOffset = byteArrayToLong(identifier, 0);
+ if (flushedFileCounter == -1) {
+ identifier = new byte[8];
+ } else if (cleanFileIndex > flushedFileCounter ||
+ (cleanFileIndex == flushedFileCounter && cleanFileOffset >= flushedFileWriteOffset)) {
+ // This is to make sure that we clean only the data that is flushed
+ cleanFileIndex = flushedFileCounter;
+ cleanFileOffset = flushedFileWriteOffset;
+ Server.writeLong(identifier, 0, calculateOffset(cleanFileOffset, cleanFileIndex));
+ }
+ cleanedOffset = identifier;
+
+ try {
+ writeData(cleanFileOffsetFileTemp, identifier).close();
+ fs.rename(cleanFileOffsetFileTemp, cleanFileOffsetFile);
+ if (cleanedFileCounter >= cleanFileIndex) {
+ return;
+ }
+ do {
+ Path path = new Path(basePath, String.valueOf(cleanedFileCounter));
+ if (fs.exists(path) && fs.isFile(path)) {
+ fs.delete(path, false);
+ }
+ path = new Path(basePath, cleanedFileCounter + OFFSET_SUFFIX);
+ if (fs.exists(path) && fs.isFile(path)) {
+ fs.delete(path, false);
+ }
+ path = new Path(basePath, cleanedFileCounter + BOOK_KEEPING_FILE_OFFSET);
+ if (fs.exists(path) && fs.isFile(path)) {
+ fs.delete(path, false);
+ }
+ logger.info("deleted file {}", cleanedFileCounter);
+ ++cleanedFileCounter;
+ } while (cleanedFileCounter < cleanFileIndex);
+ // writeData(cleanFileCounterFile, String.valueOf(cleanedFileCounter).getBytes()).close();
+
+ } catch (IOException e) {
+ logger.warn("not able to close the streams {}", e.getMessage());
+ closeFs();
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * This is used mainly for cleaning up of counter files created
+ */
+ void cleanHelperFiles()
+ {
+ try {
+ fs.delete(basePath, true);
+ } catch (IOException e) {
+ logger.warn(e.getMessage());
+ }
+ }
+
+ private void closeUnflushedFiles()
+ {
+ try {
+ files2Commit.clear();
+ // closing the stream
+ if (dataStream != null) {
+ dataStream.close();
+ dataStream = null;
+ // currentWrittenFile++;
+ // fileWriteOffset = 0;
+ }
+
+ if (!fs.exists(new Path(basePath, currentWrittenFile + OFFSET_SUFFIX))) {
+ fs.delete(new Path(basePath, String.valueOf(currentWrittenFile)), false);
+ }
+
+ if (fs.exists(new Path(basePath, flushedFileCounter + OFFSET_SUFFIX))) {
+ // This means that flush was called
+ flushedFileWriteOffset = getFlushedFileWriteOffset(new Path(basePath, flushedFileCounter + OFFSET_SUFFIX));
+ bookKeepingFileOffset = getFlushedFileWriteOffset(
+ new Path(basePath, flushedFileCounter + BOOK_KEEPING_FILE_OFFSET));
+ }
+
+ if (flushedFileCounter != -1) {
+ currentWrittenFile = flushedFileCounter;
+ fileWriteOffset = flushedFileWriteOffset;
+ } else {
+ currentWrittenFile = 0;
+ fileWriteOffset = 0;
+ }
+
+ flushedLong = 0;
+
+ } catch (IOException e) {
+ closeFs();
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void flush()
+ {
+ nextReadStream = null;
+ StringBuilder builder = new StringBuilder();
+ Iterator<DataBlock> itr = files2Commit.iterator();
+ DataBlock db;
+ try {
+ while (itr.hasNext()) {
+ db = itr.next();
+ db.updateOffsets();
+ builder.append(db.fileName).append(", ");
+ }
+ files2Commit.clear();
+
+ if (dataStream != null) {
+ dataStream.hflush();
+ writeData(flushedCounterFileTemp, String.valueOf(currentWrittenFile).getBytes()).close();
+ fs.rename(flushedCounterFileTemp, flushedCounterFile);
+ updateFlushedOffset(new Path(basePath, currentWrittenFile + OFFSET_SUFFIX), fileWriteOffset);
+ flushedFileWriteOffset = fileWriteOffset;
+ builder.append(currentWrittenFile);
+ }
+ logger.debug("flushed files {}", builder.toString());
+ } catch (IOException ex) {
+ logger.warn("not able to close the stream {}", ex.getMessage());
+ closeFs();
+ throw new RuntimeException(ex);
+ }
+ flushedFileCounter = currentWrittenFile;
+ // logger.debug("flushedFileCounter in flush {}",flushedFileCounter);
+ }
+
+ /**
+ * This updates the flushed offset
+ */
+ private void updateFlushedOffset(Path file, long bytesWritten)
+ {
+ byte[] lastStoredOffset = new byte[IDENTIFIER_SIZE];
+ Server.writeLong(lastStoredOffset, 0, bytesWritten);
+ try {
+ writeData(file, lastStoredOffset).close();
+ } catch (IOException e) {
+ try {
+ if (!Arrays.equals(readData(file), lastStoredOffset)) {
+ closeFs();
+ throw new RuntimeException(e);
+ }
+ } catch (Exception e1) {
+ closeFs();
+ throw new RuntimeException(e1);
+ }
+ }
+ }
+
+ public int getBlockSizeMultiple()
+ {
+ return blockSizeMultiple;
+ }
+
+ public void setBlockSizeMultiple(int blockSizeMultiple)
+ {
+ this.blockSizeMultiple = blockSizeMultiple;
+ }
+
+ /**
+ * @return the baseDir
+ */
+ public String getBaseDir()
+ {
+ return baseDir;
+ }
+
+ /**
+ * @param baseDir the baseDir to set
+ */
+ public void setBaseDir(String baseDir)
+ {
+ this.baseDir = baseDir;
+ }
+
+ /**
+ * @return the id
+ */
+ public String getId()
+ {
+ return id;
+ }
+
+ /**
+ * @param id the id to set
+ */
+ public void setId(String id)
+ {
+ this.id = id;
+ }
+
+ /**
+ * @return the blockSize
+ */
+ public long getBlockSize()
+ {
+ return blockSize;
+ }
+
+ /**
+ * @param blockSize the blockSize to set
+ */
+ public void setBlockSize(long blockSize)
+ {
+ this.blockSize = blockSize;
+ }
+
+ /**
+ * @return the restore
+ */
+ public boolean isRestore()
+ {
+ return restore;
+ }
+
+ /**
+ * @param restore the restore to set
+ */
+ public void setRestore(boolean restore)
+ {
+ this.restore = restore;
+ }
+
+ class DataBlock
+ {
+ FSDataOutputStream dataStream;
+ long dataOffset;
+ Path path2FlushedData;
+ long fileName;
+ private Path bookKeepingPath;
+
+ DataBlock(FSDataOutputStream stream, long bytesWritten, Path path2FlushedData, long fileName)
+ {
+ this.dataStream = stream;
+ this.dataOffset = bytesWritten;
+ this.path2FlushedData = path2FlushedData;
+ this.fileName = fileName;
+ }
+
+ public void close()
+ {
+ if (dataStream != null) {
+ try {
+ dataStream.close();
+ bookKeepingPath = new Path(basePath, fileName + BOOK_KEEPING_FILE_OFFSET);
+ updateFlushedOffset(bookKeepingPath, dataOffset);
+ } catch (IOException ex) {
+ logger.warn("not able to close the stream {}", ex.getMessage());
+ closeFs();
+ throw new RuntimeException(ex);
+ }
+ }
+ }
+
+ public void updateOffsets() throws IOException
+ {
+ updateFlushedOffset(path2FlushedData, dataOffset);
+ if (bookKeepingPath != null && fs.exists(bookKeepingPath)) {
+ fs.delete(bookKeepingPath, false);
+ }
+ }
+
+ }
+
+ private static final Logger logger = LoggerFactory.getLogger(HDFSStorage.class);
+
+ @Override
+ public void setup(com.datatorrent.api.Context context)
+ {
+ Configuration conf = new Configuration();
+ if (baseDir == null) {
+ baseDir = conf.get("hadoop.tmp.dir");
+ if (baseDir == null || baseDir.isEmpty()) {
+ throw new IllegalArgumentException("baseDir cannot be null.");
+ }
+ }
+ offset = 4;
+ skipOffset = -1;
+ skipFile = -1;
+ int tempRetryCount = 0;
+ while (tempRetryCount < retryCount && fs == null) {
+ try {
+ fs = FileSystem.newInstance(conf);
+ tempRetryCount++;
+ } catch (Throwable throwable) {
+ logger.warn("Not able to get file system ", throwable);
+ }
+ }
+
+ try {
+ Path path = new Path(baseDir);
+ basePath = new Path(path, id);
+ if (fs == null) {
+ fs = FileSystem.newInstance(conf);
+ }
+ if (!fs.exists(path)) {
+ closeFs();
+ throw new RuntimeException(String.format("baseDir passed (%s) doesn't exist.", baseDir));
+ }
+ if (!fs.isDirectory(path)) {
+ closeFs();
+ throw new RuntimeException(String.format("baseDir passed (%s) is not a directory.", baseDir));
+ }
+ if (!restore) {
+ fs.delete(basePath, true);
+ }
+ if (!fs.exists(basePath) || !fs.isDirectory(basePath)) {
+ fs.mkdirs(basePath);
+ }
+
+ if (blockSize == 0) {
+ blockSize = fs.getDefaultBlockSize(new Path(basePath, "tempData"));
+ }
+ if (blockSize == 0) {
+ blockSize = DEFAULT_BLOCK_SIZE;
+ }
+
+ blockSize = blockSizeMultiple * blockSize;
+
+ currentWrittenFile = 0;
+ cleanedFileCounter = -1;
+ retrievalFile = -1;
+ // fileCounterFile = new Path(basePath, IDENTITY_FILE);
+ flushedFileCounter = -1;
+ // cleanFileCounterFile = new Path(basePath, CLEAN_FILE);
+ cleanFileOffsetFile = new Path(basePath, CLEAN_OFFSET_FILE);
+ cleanFileOffsetFileTemp = new Path(basePath, CLEAN_OFFSET_FILE_TEMP);
+ flushedCounterFile = new Path(basePath, FLUSHED_IDENTITY_FILE);
+ flushedCounterFileTemp = new Path(basePath, FLUSHED_IDENTITY_FILE_TEMP);
+
+ if (restore) {
+ //
+ // if (fs.exists(fileCounterFile) && fs.isFile(fileCounterFile)) {
+ // //currentWrittenFile = Long.valueOf(new String(readData(fileCounterFile)));
+ // }
+
+ if (fs.exists(cleanFileOffsetFile) && fs.isFile(cleanFileOffsetFile)) {
+ cleanedOffset = readData(cleanFileOffsetFile);
+ }
+
+ if (fs.exists(flushedCounterFile) && fs.isFile(flushedCounterFile)) {
+ String strFlushedFileCounter = new String(readData(flushedCounterFile));
+ if (strFlushedFileCounter.isEmpty()) {
+ logger.warn("empty flushed file");
+ } else {
+ flushedFileCounter = Long.valueOf(strFlushedFileCounter);
+ flushedFileWriteOffset = getFlushedFileWriteOffset(new Path(basePath, flushedFileCounter + OFFSET_SUFFIX));
+ bookKeepingFileOffset = getFlushedFileWriteOffset(
+ new Path(basePath, flushedFileCounter + BOOK_KEEPING_FILE_OFFSET));
+ }
+
+ }
+ }
+ fileWriteOffset = flushedFileWriteOffset;
+ currentWrittenFile = flushedFileCounter;
+ cleanedFileCounter = byteArrayToLong(cleanedOffset, offset) - 1;
+ if (currentWrittenFile == -1) {
+ ++currentWrittenFile;
+ fileWriteOffset = 0;
+ }
+
+ } catch (IOException io) {
+
+ throw new RuntimeException(io);
+ }
+ storageExecutor = Executors.newSingleThreadExecutor(new NameableThreadFactory("StorageHelper"));
+ }
+
+ private void closeFs()
+ {
+ if (fs != null) {
+ try {
+ fs.close();
+ fs = null;
+ } catch (IOException e) {
+ logger.debug(e.getMessage());
+ }
+ }
+ }
+
+ private long getFlushedFileWriteOffset(Path filePath) throws IOException
+ {
+ if (flushedFileCounter != -1 && fs.exists(filePath)) {
+ byte[] flushedFileOffsetByte = readData(filePath);
+ if (flushedFileOffsetByte != null && flushedFileOffsetByte.length == 8) {
+ return Server.readLong(flushedFileOffsetByte, 0);
+ }
+ }
+ return 0;
+ }
+
+ @Override
+ public void teardown()
+ {
+ logger.debug("called teardown");
+ try {
+ if (readStream != null) {
+ readStream.close();
+ }
+ synchronized (HDFSStorage.this) {
+ if (nextReadStream != null) {
+ nextReadStream.close();
+ }
+ }
+
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } finally {
+ closeUnflushedFiles();
+ storageExecutor.shutdown();
+ }
+
+ }
+
+ private Runnable getNextStream()
+ {
+ return new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try {
+ synchronized (HDFSStorage.this) {
+ nextRetrievalFile = retrievalFile + 1;
+ if (nextRetrievalFile > flushedFileCounter) {
+ nextRetrievalData = null;
+ return;
+ }
+ Path path = new Path(basePath, String.valueOf(nextRetrievalFile));
+ Path offsetPath = new Path(basePath, nextRetrievalFile + OFFSET_SUFFIX);
+ nextRetrievalData = null;
+ nextRetrievalData = readData(path);
+ byte[] flushedOffset = readData(offsetPath);
+ nextFlushedLong = Server.readLong(flushedOffset, 0);
+ }
+ } catch (Throwable e) {
+ logger.warn("in storage executor ", e);
+
+ }
+ }
+ };
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/main/java/org/apache/apex/malhar/flume/storage/Storage.java
----------------------------------------------------------------------
diff --git a/flume/src/main/java/org/apache/apex/malhar/flume/storage/Storage.java b/flume/src/main/java/org/apache/apex/malhar/flume/storage/Storage.java
new file mode 100644
index 0000000..add1831
--- /dev/null
+++ b/flume/src/main/java/org/apache/apex/malhar/flume/storage/Storage.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.flume.storage;
+
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * <p>Storage interface.</p>
+ *
+ * @since 0.9.2
+ */
+public interface Storage
+{
+ /**
+ * key in the context for Unique identifier for the storage which may be used to recover from failure.
+ */
+ String ID = "id";
+
+ /**
+ * This stores the bytes and returns the unique identifier to retrieve these bytes
+ *
+ * @param bytes
+ * @return
+ */
+ byte[] store(Slice bytes);
+
+ /**
+ * This returns the data bytes for the current identifier and the identifier for next data bytes. <br/>
+ * The first eight bytes contain the identifier and the remaining bytes contain the data
+ *
+ * @param identifier
+ * @return
+ */
+ byte[] retrieve(byte[] identifier);
+
+ /**
+ * This returns data bytes and the identifier for the next data bytes. The identifier for current data bytes is based
+ * on the retrieve method call and number of retrieveNext method calls after retrieve method call. <br/>
+ * The first eight bytes contain the identifier and the remaining bytes contain the data
+ *
+ * @return
+ */
+ byte[] retrieveNext();
+
+ /**
+ * This is used to clean up the files identified by identifier
+ *
+ * @param identifier
+ */
+ void clean(byte[] identifier);
+
+ /**
+ * This flushes the data from stream
+ *
+ */
+ void flush();
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/main/resources/flume-conf/flume-conf.sample.properties
----------------------------------------------------------------------
diff --git a/flume/src/main/resources/flume-conf/flume-conf.sample.properties b/flume/src/main/resources/flume-conf/flume-conf.sample.properties
index 1782d4a..af59e52 100644
--- a/flume/src/main/resources/flume-conf/flume-conf.sample.properties
+++ b/flume/src/main/resources/flume-conf/flume-conf.sample.properties
@@ -23,7 +23,7 @@
agent1.sinks = dt
# first sink - dt
- agent1.sinks.dt.type = com.datatorrent.flume.sink.DTFlumeSink
+ agent1.sinks.dt.type = org.apache.apex.malhar.flume.sink.DTFlumeSink
agent1.sinks.dt.id = sink1
agent1.sinks.dt.hostname = localhost
agent1.sinks.dt.port = 8080
@@ -31,7 +31,7 @@
agent1.sinks.dt.throughputAdjustmentFactor = 2
agent1.sinks.dt.maximumEventsPerTransaction = 5000
agent1.sinks.dt.minimumEventsPerTransaction = 1
- agent1.sinks.dt.storage = com.datatorrent.flume.storage.HDFSStorage
+ agent1.sinks.dt.storage = org.apache.apex.malhar.flume.storage.HDFSStorage
agent1.sinks.dt.storage.restore = false
agent1.sinks.dt.storage.baseDir = /tmp/flume101
agent1.sinks.dt.channel = ch1
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/java/com/datatorrent/flume/discovery/ZKAssistedDiscoveryTest.java
----------------------------------------------------------------------
diff --git a/flume/src/test/java/com/datatorrent/flume/discovery/ZKAssistedDiscoveryTest.java b/flume/src/test/java/com/datatorrent/flume/discovery/ZKAssistedDiscoveryTest.java
deleted file mode 100644
index f182edc..0000000
--- a/flume/src/test/java/com/datatorrent/flume/discovery/ZKAssistedDiscoveryTest.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.flume.discovery;
-
-import org.codehaus.jackson.type.TypeReference;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.curator.x.discovery.ServiceInstance;
-import org.apache.curator.x.discovery.details.InstanceSerializer;
-
-import com.datatorrent.flume.discovery.Discovery.Service;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertNotNull;
-
-/**
- *
- */
-@Ignore
-public class ZKAssistedDiscoveryTest
-{
- public ZKAssistedDiscoveryTest()
- {
- }
-
- @Test
- public void testSerialization() throws Exception
- {
- ZKAssistedDiscovery discovery = new ZKAssistedDiscovery();
- discovery.setServiceName("DTFlumeTest");
- discovery.setConnectionString("localhost:2181");
- discovery.setBasePath("/HelloDT");
- discovery.setup(null);
- ServiceInstance<byte[]> instance = discovery.getInstance(new Service<byte[]>()
- {
- @Override
- public String getHost()
- {
- return "localhost";
- }
-
- @Override
- public int getPort()
- {
- return 8080;
- }
-
- @Override
- public byte[] getPayload()
- {
- return null;
- }
-
- @Override
- public String getId()
- {
- return "localhost8080";
- }
-
- });
- InstanceSerializer<byte[]> instanceSerializer =
- discovery.getInstanceSerializerFactory().getInstanceSerializer(new TypeReference<ServiceInstance<byte[]>>()
- {
- });
- byte[] serialize = instanceSerializer.serialize(instance);
- logger.debug("serialized json = {}", new String(serialize));
- ServiceInstance<byte[]> deserialize = instanceSerializer.deserialize(serialize);
- assertArrayEquals("Metadata", instance.getPayload(), deserialize.getPayload());
- }
-
- @Test
- public void testDiscover()
- {
- ZKAssistedDiscovery discovery = new ZKAssistedDiscovery();
- discovery.setServiceName("DTFlumeTest");
- discovery.setConnectionString("localhost:2181");
- discovery.setBasePath("/HelloDT");
- discovery.setup(null);
- assertNotNull("Discovered Sinks", discovery.discover());
- discovery.teardown();
- }
-
- @Test
- public void testAdvertize()
- {
- ZKAssistedDiscovery discovery = new ZKAssistedDiscovery();
- discovery.setServiceName("DTFlumeTest");
- discovery.setConnectionString("localhost:2181");
- discovery.setBasePath("/HelloDT");
- discovery.setup(null);
-
- Service<byte[]> service = new Service<byte[]>()
- {
- @Override
- public String getHost()
- {
- return "chetan";
- }
-
- @Override
- public int getPort()
- {
- return 5033;
- }
-
- @Override
- public byte[] getPayload()
- {
- return new byte[] {3, 2, 1};
- }
-
- @Override
- public String getId()
- {
- return "uniqueId";
- }
-
- };
- discovery.advertise(service);
- discovery.teardown();
- }
-
- private static final Logger logger = LoggerFactory.getLogger(ZKAssistedDiscoveryTest.class);
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/java/com/datatorrent/flume/integration/ApplicationTest.java
----------------------------------------------------------------------
diff --git a/flume/src/test/java/com/datatorrent/flume/integration/ApplicationTest.java b/flume/src/test/java/com/datatorrent/flume/integration/ApplicationTest.java
deleted file mode 100644
index 8256916..0000000
--- a/flume/src/test/java/com/datatorrent/flume/integration/ApplicationTest.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.flume.integration;
-
-import org.junit.Ignore;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.flume.Event;
-import org.apache.hadoop.conf.Configuration;
-
-import com.datatorrent.api.Context.OperatorContext;
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.DAG.Locality;
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.LocalMode;
-import com.datatorrent.api.Operator;
-import com.datatorrent.api.StreamingApplication;
-import com.datatorrent.flume.operator.AbstractFlumeInputOperator;
-import com.datatorrent.flume.storage.EventCodec;
-
-/**
- *
- */
-@Ignore
-public class ApplicationTest implements StreamingApplication
-{
- public static class FlumeInputOperator extends AbstractFlumeInputOperator<Event>
- {
- @Override
- public Event convert(Event event)
- {
- return event;
- }
- }
-
- public static class Counter implements Operator
- {
- private int count;
- private transient Event event;
- public final transient DefaultInputPort<Event> input = new DefaultInputPort<Event>()
- {
- @Override
- public void process(Event tuple)
- {
- count++;
- event = tuple;
- }
-
- };
-
- @Override
- public void beginWindow(long windowId)
- {
- }
-
- @Override
- public void endWindow()
- {
- logger.debug("total count = {}, tuple = {}", count, event);
- }
-
- @Override
- public void setup(OperatorContext context)
- {
- }
-
- @Override
- public void teardown()
- {
- }
-
- private static final Logger logger = LoggerFactory.getLogger(Counter.class);
- }
-
- @Override
- public void populateDAG(DAG dag, Configuration conf)
- {
- dag.setAttribute(com.datatorrent.api.Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS, 1000);
- FlumeInputOperator flume = dag.addOperator("FlumeOperator", new FlumeInputOperator());
- flume.setConnectAddresses(new String[]{"test:127.0.0.1:8080"});
- flume.setCodec(new EventCodec());
- Counter counter = dag.addOperator("Counter", new Counter());
-
- dag.addStream("Slices", flume.output, counter.input).setLocality(Locality.CONTAINER_LOCAL);
- }
-
- @Test
- public void test()
- {
- try {
- LocalMode.runApp(this, Integer.MAX_VALUE);
- } catch (Exception ex) {
- logger.warn("The dag seems to be not testable yet, if it's - remove this exception handling", ex);
- }
-
- }
-
- private static final Logger logger = LoggerFactory.getLogger(ApplicationTest.class);
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/java/com/datatorrent/flume/interceptor/ColumnFilteringFormattingInterceptorTest.java
----------------------------------------------------------------------
diff --git a/flume/src/test/java/com/datatorrent/flume/interceptor/ColumnFilteringFormattingInterceptorTest.java b/flume/src/test/java/com/datatorrent/flume/interceptor/ColumnFilteringFormattingInterceptorTest.java
deleted file mode 100644
index aca99c3..0000000
--- a/flume/src/test/java/com/datatorrent/flume/interceptor/ColumnFilteringFormattingInterceptorTest.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.flume.interceptor;
-
-import java.io.IOException;
-import java.net.URISyntaxException;
-import java.util.HashMap;
-
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import org.apache.flume.Context;
-import org.apache.flume.interceptor.Interceptor;
-
-import static org.junit.Assert.assertArrayEquals;
-
-/**
- * Tests for {@link ColumnFilteringFormattingInterceptor}
- */
-public class ColumnFilteringFormattingInterceptorTest
-{
- private static InterceptorTestHelper helper;
-
- @BeforeClass
- public static void startUp()
- {
- HashMap<String, String> contextMap = new HashMap<String, String>();
- contextMap.put(ColumnFilteringInterceptor.Constants.SRC_SEPARATOR, Byte.toString((byte)2));
- contextMap.put(ColumnFilteringFormattingInterceptor.Constants.COLUMNS_FORMATTER, "{1}\001{2}\001{3}\001");
-
- helper = new InterceptorTestHelper(new ColumnFilteringFormattingInterceptor.Builder(), contextMap);
- }
-
- @Test
- public void testInterceptEvent()
- {
- helper.testIntercept_Event();
- }
-
- @Test
- public void testFiles() throws IOException, URISyntaxException
- {
- helper.testFiles();
- }
-
- @Test
- public void testInterceptEventWithPrefix()
- {
- HashMap<String, String> contextMap = new HashMap<String, String>();
- contextMap.put(ColumnFilteringInterceptor.Constants.SRC_SEPARATOR, Byte.toString((byte)2));
- contextMap.put(ColumnFilteringFormattingInterceptor.Constants.COLUMNS_FORMATTER, "\001{1}\001{2}\001{3}\001");
-
- ColumnFilteringFormattingInterceptor.Builder builder = new ColumnFilteringFormattingInterceptor.Builder();
- builder.configure(new Context(contextMap));
- Interceptor interceptor = builder.build();
-
- assertArrayEquals("Six Fields",
- "\001\001Second\001\001".getBytes(),
- interceptor.intercept(
- new InterceptorTestHelper.MyEvent("First\002\002Second\002\002\002".getBytes())).getBody());
- }
-
- @Test
- public void testInterceptEventWithLongSeparator()
- {
- HashMap<String, String> contextMap = new HashMap<String, String>();
- contextMap.put(ColumnFilteringInterceptor.Constants.SRC_SEPARATOR, Byte.toString((byte)2));
- contextMap.put(ColumnFilteringFormattingInterceptor.Constants.COLUMNS_FORMATTER, "a{1}bc{2}def{3}ghi");
-
- ColumnFilteringFormattingInterceptor.Builder builder = new ColumnFilteringFormattingInterceptor.Builder();
- builder.configure(new Context(contextMap));
- Interceptor interceptor = builder.build();
- byte[] body = interceptor.intercept(
- new InterceptorTestHelper.MyEvent("First\002\002Second\002\002\002".getBytes())).getBody();
-
- assertArrayEquals("Six Fields, " + new String(body), "abcSeconddefghi".getBytes(), body);
- }
-
- @Test
- public void testInterceptEventWithTerminatingSeparator()
- {
- HashMap<String, String> contextMap = new HashMap<String, String>();
- contextMap.put(ColumnFilteringInterceptor.Constants.SRC_SEPARATOR, Byte.toString((byte)2));
- contextMap.put(ColumnFilteringFormattingInterceptor.Constants.COLUMNS_FORMATTER, "a{1}bc{2}def{3}");
-
- ColumnFilteringFormattingInterceptor.Builder builder = new ColumnFilteringFormattingInterceptor.Builder();
- builder.configure(new Context(contextMap));
- Interceptor interceptor = builder.build();
- byte[] body = interceptor.intercept(
- new InterceptorTestHelper.MyEvent("First\002\002Second\002\002\002".getBytes())).getBody();
-
- assertArrayEquals("Six Fields, " + new String(body), "abcSeconddef".getBytes(), body);
- }
-
- @Test
- public void testInterceptEventWithColumnZero()
- {
- HashMap<String, String> contextMap = new HashMap<String, String>();
- contextMap.put(ColumnFilteringInterceptor.Constants.SRC_SEPARATOR, Byte.toString((byte)2));
- contextMap.put(ColumnFilteringFormattingInterceptor.Constants.COLUMNS_FORMATTER, "{0}\001");
-
- ColumnFilteringFormattingInterceptor.Builder builder = new ColumnFilteringFormattingInterceptor.Builder();
- builder.configure(new Context(contextMap));
- Interceptor interceptor = builder.build();
-
- assertArrayEquals("Empty Bytes",
- "\001".getBytes(),
- interceptor.intercept(new InterceptorTestHelper.MyEvent("".getBytes())).getBody());
-
- assertArrayEquals("One Field",
- "First\001".getBytes(),
- interceptor.intercept(new InterceptorTestHelper.MyEvent("First".getBytes())).getBody());
-
- assertArrayEquals("Two Fields",
- "\001".getBytes(),
- interceptor.intercept(new InterceptorTestHelper.MyEvent("\002First".getBytes())).getBody());
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/java/com/datatorrent/flume/interceptor/ColumnFilteringInterceptorTest.java
----------------------------------------------------------------------
diff --git a/flume/src/test/java/com/datatorrent/flume/interceptor/ColumnFilteringInterceptorTest.java b/flume/src/test/java/com/datatorrent/flume/interceptor/ColumnFilteringInterceptorTest.java
deleted file mode 100644
index 11ee23f..0000000
--- a/flume/src/test/java/com/datatorrent/flume/interceptor/ColumnFilteringInterceptorTest.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.flume.interceptor;
-
-import java.io.IOException;
-import java.net.URISyntaxException;
-import java.util.HashMap;
-
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import org.apache.flume.Context;
-import org.apache.flume.interceptor.Interceptor;
-
-import static org.junit.Assert.assertArrayEquals;
-
-/**
- *
- */
-public class ColumnFilteringInterceptorTest
-{
- private static InterceptorTestHelper helper;
-
- @BeforeClass
- public static void startUp()
- {
- HashMap<String, String> contextMap = new HashMap<String, String>();
- contextMap.put(ColumnFilteringInterceptor.Constants.DST_SEPARATOR, Byte.toString((byte)1));
- contextMap.put(ColumnFilteringInterceptor.Constants.SRC_SEPARATOR, Byte.toString((byte)2));
- contextMap.put(ColumnFilteringInterceptor.Constants.COLUMNS, "1 2 3");
-
- helper = new InterceptorTestHelper(new ColumnFilteringInterceptor.Builder(), contextMap);
- }
-
- @Test
- public void testInterceptEvent()
- {
- helper.testIntercept_Event();
- }
-
- @Test
- public void testFiles() throws IOException, URISyntaxException
- {
- helper.testFiles();
- }
-
- @Test
- public void testInterceptEventWithColumnZero()
- {
- HashMap<String, String> contextMap = new HashMap<String, String>();
- contextMap.put(ColumnFilteringInterceptor.Constants.DST_SEPARATOR, Byte.toString((byte)1));
- contextMap.put(ColumnFilteringInterceptor.Constants.SRC_SEPARATOR, Byte.toString((byte)2));
- contextMap.put(ColumnFilteringInterceptor.Constants.COLUMNS, "0");
-
- ColumnFilteringInterceptor.Builder builder = new ColumnFilteringInterceptor.Builder();
- builder.configure(new Context(contextMap));
- Interceptor interceptor = builder.build();
-
- assertArrayEquals("Empty Bytes",
- "\001".getBytes(),
- interceptor.intercept(new InterceptorTestHelper.MyEvent("".getBytes())).getBody());
-
- assertArrayEquals("One Field",
- "First\001".getBytes(),
- interceptor.intercept(new InterceptorTestHelper.MyEvent("First".getBytes())).getBody());
-
- assertArrayEquals("Two Fields",
- "\001".getBytes(),
- interceptor.intercept(new InterceptorTestHelper.MyEvent("\002First".getBytes())).getBody());
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/java/com/datatorrent/flume/interceptor/InterceptorTestHelper.java
----------------------------------------------------------------------
diff --git a/flume/src/test/java/com/datatorrent/flume/interceptor/InterceptorTestHelper.java b/flume/src/test/java/com/datatorrent/flume/interceptor/InterceptorTestHelper.java
deleted file mode 100644
index dc95f08..0000000
--- a/flume/src/test/java/com/datatorrent/flume/interceptor/InterceptorTestHelper.java
+++ /dev/null
@@ -1,216 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.flume.interceptor;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.net.URISyntaxException;
-import java.net.URL;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Properties;
-
-import org.junit.Assert;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.flume.interceptor.Interceptor;
-
-import com.datatorrent.netlet.util.Slice;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertNotNull;
-
-/**
- *
- */
-public class InterceptorTestHelper
-{
- private static final byte FIELD_SEPARATOR = 1;
-
- static class MyEvent implements Event
- {
- byte[] body;
-
- MyEvent(byte[] bytes)
- {
- body = bytes;
- }
-
- @Override
- public Map<String, String> getHeaders()
- {
- return null;
- }
-
- @Override
- public void setHeaders(Map<String, String> map)
- {
- }
-
- @Override
- @SuppressWarnings("ReturnOfCollectionOrArrayField")
- public byte[] getBody()
- {
- return body;
- }
-
- @Override
- @SuppressWarnings("AssignmentToCollectionOrArrayFieldFromParameter")
- public void setBody(byte[] bytes)
- {
- body = bytes;
- }
- }
-
- private final Interceptor.Builder builder;
- private final Map<String, String> context;
-
- InterceptorTestHelper(Interceptor.Builder builder, Map<String, String> context)
- {
- this.builder = builder;
- this.context = context;
- }
-
- public void testIntercept_Event()
- {
- builder.configure(new Context(context));
- Interceptor interceptor = builder.build();
-
- assertArrayEquals("Empty Bytes",
- "\001\001\001".getBytes(),
- interceptor.intercept(new MyEvent("".getBytes())).getBody());
-
- assertArrayEquals("One Separator",
- "\001\001\001".getBytes(),
- interceptor.intercept(new MyEvent("\002".getBytes())).getBody());
-
- assertArrayEquals("Two Separators",
- "\001\001\001".getBytes(),
- interceptor.intercept(new MyEvent("\002\002".getBytes())).getBody());
-
- assertArrayEquals("One Field",
- "\001\001\001".getBytes(),
- interceptor.intercept(new MyEvent("First".getBytes())).getBody());
-
- assertArrayEquals("Two Fields",
- "First\001\001\001".getBytes(),
- interceptor.intercept(new MyEvent("\002First".getBytes())).getBody());
-
- assertArrayEquals("Two Fields",
- "\001\001\001".getBytes(),
- interceptor.intercept(new MyEvent("First\001".getBytes())).getBody());
-
- assertArrayEquals("Two Fields",
- "Second\001\001\001".getBytes(),
- interceptor.intercept(new MyEvent("First\002Second".getBytes())).getBody());
-
- assertArrayEquals("Three Fields",
- "Second\001\001\001".getBytes(),
- interceptor.intercept(new MyEvent("First\002Second\002".getBytes())).getBody());
-
- assertArrayEquals("Three Fields",
- "\001Second\001\001".getBytes(),
- interceptor.intercept(new MyEvent("First\002\002Second".getBytes())).getBody());
-
- assertArrayEquals("Four Fields",
- "\001Second\001\001".getBytes(),
- interceptor.intercept(new MyEvent("First\002\002Second\002".getBytes())).getBody());
-
- assertArrayEquals("Five Fields",
- "\001Second\001\001".getBytes(),
- interceptor.intercept(new MyEvent("First\002\002Second\002\002".getBytes())).getBody());
-
- assertArrayEquals("Six Fields",
- "\001Second\001\001".getBytes(),
- interceptor.intercept(new MyEvent("First\002\002Second\002\002\002".getBytes())).getBody());
- }
-
- public void testFiles() throws IOException, URISyntaxException
- {
- Properties properties = new Properties();
- properties.load(getClass().getResourceAsStream("/flume/conf/flume-conf.properties"));
-
- String interceptor = null;
- for (Entry<Object, Object> entry : properties.entrySet()) {
- logger.debug("{} => {}", entry.getKey(), entry.getValue());
-
- if (builder.getClass().getName().equals(entry.getValue().toString())) {
- String key = entry.getKey().toString();
- if (key.endsWith(".type")) {
- interceptor = key.substring(0, key.length() - "type".length());
- break;
- }
- }
- }
-
- assertNotNull(builder.getClass().getName(), interceptor);
- @SuppressWarnings({"null", "ConstantConditions"})
- final int interceptorLength = interceptor.length();
-
- HashMap<String, String> map = new HashMap<String, String>();
- for (Entry<Object, Object> entry : properties.entrySet()) {
- String key = entry.getKey().toString();
- if (key.startsWith(interceptor)) {
- map.put(key.substring(interceptorLength), entry.getValue().toString());
- }
- }
-
- builder.configure(new Context(map));
- Interceptor interceptorInstance = builder.build();
-
- URL url = getClass().getResource("/test_data/gentxns/");
- assertNotNull("Generated Transactions", url);
-
- int records = 0;
- File dir = new File(url.toURI());
- for (File file : dir.listFiles()) {
- records += processFile(file, interceptorInstance);
- }
-
- Assert.assertEquals("Total Records", 2200, records);
- }
-
- private int processFile(File file, Interceptor interceptor) throws IOException
- {
- InputStream stream = getClass().getResourceAsStream("/test_data/gentxns/" + file.getName());
- BufferedReader br = new BufferedReader(new InputStreamReader(stream));
-
- String line;
- int i = 0;
- while ((line = br.readLine()) != null) {
- byte[] body = interceptor.intercept(new MyEvent(line.getBytes())).getBody();
- RawEvent event = RawEvent.from(body, FIELD_SEPARATOR);
- Assert.assertEquals("GUID", new Slice(line.getBytes(), 0, 32), event.guid);
- logger.debug("guid = {}, time = {}", event.guid, event.time);
- i++;
- }
-
- br.close();
- return i;
- }
-
- private static final Logger logger = LoggerFactory.getLogger(InterceptorTestHelper.class);
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/java/com/datatorrent/flume/interceptor/RawEvent.java
----------------------------------------------------------------------
diff --git a/flume/src/test/java/com/datatorrent/flume/interceptor/RawEvent.java b/flume/src/test/java/com/datatorrent/flume/interceptor/RawEvent.java
deleted file mode 100644
index c029cd0..0000000
--- a/flume/src/test/java/com/datatorrent/flume/interceptor/RawEvent.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.flume.interceptor;
-
-import java.io.Serializable;
-
-import org.joda.time.format.DateTimeFormat;
-import org.joda.time.format.DateTimeFormatter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.datatorrent.netlet.util.Slice;
-
-/**
- *
- */
-public class RawEvent implements Serializable
-{
- public Slice guid;
- public long time;
- public int dimensionsOffset;
-
- public Slice getGUID()
- {
- return guid;
- }
-
- public long getTime()
- {
- return time;
- }
-
- RawEvent()
- {
- /* needed for Kryo serialization */
- }
-
- public static RawEvent from(byte[] row, byte separator)
- {
- final int rowsize = row.length;
-
- /*
- * Lets get the guid out of the current record
- */
- int sliceLengh = -1;
- while (++sliceLengh < rowsize) {
- if (row[sliceLengh] == separator) {
- break;
- }
- }
-
- int i = sliceLengh + 1;
-
- /* lets parse the date */
- int dateStart = i;
- while (i < rowsize) {
- if (row[i++] == separator) {
- long time = DATE_PARSER.parseMillis(new String(row, dateStart, i - dateStart - 1));
- RawEvent event = new RawEvent();
- event.guid = new Slice(row, 0, sliceLengh);
- event.time = time;
- event.dimensionsOffset = i;
- return event;
- }
- }
-
- return null;
- }
-
- @Override
- public int hashCode()
- {
- int hash = 5;
- hash = 61 * hash + (this.guid != null ? this.guid.hashCode() : 0);
- hash = 61 * hash + (int)(this.time ^ (this.time >>> 32));
- return hash;
- }
-
- @Override
- public String toString()
- {
- return "RawEvent{" + "guid=" + guid + ", time=" + time + '}';
- }
-
- @Override
- public boolean equals(Object obj)
- {
- if (obj == null) {
- return false;
- }
- if (getClass() != obj.getClass()) {
- return false;
- }
- final RawEvent other = (RawEvent)obj;
- if (this.guid != other.guid && (this.guid == null || !this.guid.equals(other.guid))) {
- return false;
- }
- return this.time == other.time;
- }
-
- private static final DateTimeFormatter DATE_PARSER = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss");
- private static final Logger logger = LoggerFactory.getLogger(RawEvent.class);
- private static final long serialVersionUID = 201312191312L;
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/java/com/datatorrent/flume/operator/AbstractFlumeInputOperatorTest.java
----------------------------------------------------------------------
diff --git a/flume/src/test/java/com/datatorrent/flume/operator/AbstractFlumeInputOperatorTest.java b/flume/src/test/java/com/datatorrent/flume/operator/AbstractFlumeInputOperatorTest.java
deleted file mode 100644
index 2f162a8..0000000
--- a/flume/src/test/java/com/datatorrent/flume/operator/AbstractFlumeInputOperatorTest.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.flume.operator;
-
-import java.util.HashSet;
-import java.util.Set;
-
-import org.junit.Test;
-
-import static org.junit.Assert.assertTrue;
-
-/**
- *
- */
-public class AbstractFlumeInputOperatorTest
-{
- public AbstractFlumeInputOperatorTest()
- {
- }
-
- @Test
- public void testThreadLocal()
- {
- ThreadLocal<Set<Integer>> tl = new ThreadLocal<Set<Integer>>()
- {
- @Override
- protected Set<Integer> initialValue()
- {
- return new HashSet<Integer>();
- }
-
- };
- Set<Integer> get1 = tl.get();
- get1.add(1);
- assertTrue("Just Added Value", get1.contains(1));
-
- Set<Integer> get2 = tl.get();
- assertTrue("Previously added value", get2.contains(1));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/java/com/datatorrent/flume/sink/DTFlumeSinkTest.java
----------------------------------------------------------------------
diff --git a/flume/src/test/java/com/datatorrent/flume/sink/DTFlumeSinkTest.java b/flume/src/test/java/com/datatorrent/flume/sink/DTFlumeSinkTest.java
deleted file mode 100644
index 7949e63..0000000
--- a/flume/src/test/java/com/datatorrent/flume/sink/DTFlumeSinkTest.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.flume.sink;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.flume.channel.MemoryChannel;
-
-import com.datatorrent.flume.discovery.Discovery;
-import com.datatorrent.netlet.AbstractLengthPrependerClient;
-import com.datatorrent.netlet.DefaultEventLoop;
-import com.datatorrent.netlet.util.Slice;
-
-/**
- *
- */
-public class DTFlumeSinkTest
-{
- static final String hostname = "localhost";
- int port = 0;
-
- @Test
- @SuppressWarnings("SleepWhileInLoop")
- public void testServer() throws InterruptedException, IOException
- {
- Discovery<byte[]> discovery = new Discovery<byte[]>()
- {
- @Override
- public synchronized void unadvertise(Service<byte[]> service)
- {
- notify();
- }
-
- @Override
- public synchronized void advertise(Service<byte[]> service)
- {
- port = service.getPort();
- logger.debug("listening at {}", service);
- notify();
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public synchronized Collection<Service<byte[]>> discover()
- {
- try {
- wait();
- } catch (InterruptedException ie) {
- throw new RuntimeException(ie);
- }
- return Collections.EMPTY_LIST;
- }
-
- };
- DTFlumeSink sink = new DTFlumeSink();
- sink.setName("TeskSink");
- sink.setHostname(hostname);
- sink.setPort(0);
- sink.setAcceptedTolerance(2000);
- sink.setChannel(new MemoryChannel());
- sink.setDiscovery(discovery);
- sink.start();
- AbstractLengthPrependerClient client = new AbstractLengthPrependerClient()
- {
- private byte[] array;
- private int offset = 2;
-
- @Override
- public void onMessage(byte[] buffer, int offset, int size)
- {
- Slice received = new Slice(buffer, offset, size);
- logger.debug("Client Received = {}", received);
- Assert.assertEquals(received,
- new Slice(Arrays.copyOfRange(array, this.offset, array.length), 0, Server.Request.FIXED_SIZE));
- synchronized (DTFlumeSinkTest.this) {
- DTFlumeSinkTest.this.notify();
- }
- }
-
- @Override
- public void connected()
- {
- super.connected();
- array = new byte[Server.Request.FIXED_SIZE + offset];
- array[offset] = Server.Command.ECHO.getOrdinal();
- array[offset + 1] = 1;
- array[offset + 2] = 2;
- array[offset + 3] = 3;
- array[offset + 4] = 4;
- array[offset + 5] = 5;
- array[offset + 6] = 6;
- array[offset + 7] = 7;
- array[offset + 8] = 8;
- Server.writeLong(array, offset + Server.Request.TIME_OFFSET, System.currentTimeMillis());
- write(array, offset, Server.Request.FIXED_SIZE);
- }
-
- };
-
- DefaultEventLoop eventloop = new DefaultEventLoop("Eventloop-TestClient");
- eventloop.start();
- discovery.discover();
- try {
- eventloop.connect(new InetSocketAddress(hostname, port), client);
- try {
- synchronized (this) {
- this.wait();
- }
- } finally {
- eventloop.disconnect(client);
- }
- } finally {
- eventloop.stop();
- }
-
- sink.stop();
- }
-
- private static final Logger logger = LoggerFactory.getLogger(DTFlumeSinkTest.class);
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/java/com/datatorrent/flume/sink/ServerTest.java
----------------------------------------------------------------------
diff --git a/flume/src/test/java/com/datatorrent/flume/sink/ServerTest.java b/flume/src/test/java/com/datatorrent/flume/sink/ServerTest.java
deleted file mode 100644
index 8c225d1..0000000
--- a/flume/src/test/java/com/datatorrent/flume/sink/ServerTest.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.flume.sink;
-
-import java.util.Random;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- *
- */
-public class ServerTest
-{
- byte[] array;
-
- public ServerTest()
- {
- array = new byte[1024];
- }
-
- @Test
- public void testInt()
- {
- Server.writeInt(array, 0, Integer.MAX_VALUE);
- Assert.assertEquals("Max Integer", Integer.MAX_VALUE, Server.readInt(array, 0));
-
- Server.writeInt(array, 0, Integer.MIN_VALUE);
- Assert.assertEquals("Min Integer", Integer.MIN_VALUE, Server.readInt(array, 0));
-
- Server.writeInt(array, 0, 0);
- Assert.assertEquals("Zero Integer", 0, Server.readInt(array, 0));
-
- Random rand = new Random();
- for (int i = 0; i < 128; i++) {
- int n = rand.nextInt();
- if (rand.nextBoolean()) {
- n = -n;
- }
- Server.writeInt(array, 0, n);
- Assert.assertEquals("Random Integer", n, Server.readInt(array, 0));
- }
- }
-
- @Test
- public void testLong()
- {
- Server.writeLong(array, 0, Integer.MAX_VALUE);
- Assert.assertEquals("Max Integer", Integer.MAX_VALUE, Server.readLong(array, 0));
-
- Server.writeLong(array, 0, Integer.MIN_VALUE);
- Assert.assertEquals("Min Integer", Integer.MIN_VALUE, Server.readLong(array, 0));
-
- Server.writeLong(array, 0, 0);
- Assert.assertEquals("Zero Integer", 0L, Server.readLong(array, 0));
-
- Server.writeLong(array, 0, Long.MAX_VALUE);
- Assert.assertEquals("Max Long", Long.MAX_VALUE, Server.readLong(array, 0));
-
- Server.writeLong(array, 0, Long.MIN_VALUE);
- Assert.assertEquals("Min Long", Long.MIN_VALUE, Server.readLong(array, 0));
-
- Server.writeLong(array, 0, 0L);
- Assert.assertEquals("Zero Long", 0L, Server.readLong(array, 0));
-
- Random rand = new Random();
- for (int i = 0; i < 128; i++) {
- long n = rand.nextLong();
- if (rand.nextBoolean()) {
- n = -n;
- }
- Server.writeLong(array, 0, n);
- Assert.assertEquals("Random Long", n, Server.readLong(array, 0));
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/java/com/datatorrent/flume/storage/HDFSStorageMatching.java
----------------------------------------------------------------------
diff --git a/flume/src/test/java/com/datatorrent/flume/storage/HDFSStorageMatching.java b/flume/src/test/java/com/datatorrent/flume/storage/HDFSStorageMatching.java
deleted file mode 100644
index 6b6adcb..0000000
--- a/flume/src/test/java/com/datatorrent/flume/storage/HDFSStorageMatching.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.flume.storage;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.primitives.Ints;
-
-import com.datatorrent.netlet.util.Slice;
-
-/**
- *
- */
-public class HDFSStorageMatching
-{
-
- public static void main(String[] args)
- {
- HDFSStorage storage = new HDFSStorage();
- storage.setBaseDir(args[0]);
- storage.setId(args[1]);
- storage.setRestore(true);
- storage.setup(null);
- int count = 100000000;
-
- logger.debug(" start time {}", System.currentTimeMillis());
- int index = 10000;
- byte[] b = Ints.toByteArray(index);
- for (int i = 0; i < count; i++) {
- storage.store(new Slice(b, 0, b.length));
- index++;
- b = Ints.toByteArray(index);
- }
- storage.flush();
- for (int i = 0; i < count; i++) {
- storage.store(new Slice(b, 0, b.length));
- index++;
- b = Ints.toByteArray(index);
- }
- storage.flush();
- for (int i = 0; i < count; i++) {
- storage.store(new Slice(b, 0, b.length));
- index++;
- b = Ints.toByteArray(index);
- }
- storage.flush();
- for (int i = 0; i < count; i++) {
- storage.store(new Slice(b, 0, b.length));
- index++;
- b = Ints.toByteArray(index);
- }
- storage.flush();
- for (int i = 0; i < count; i++) {
- storage.store(new Slice(b, 0, b.length));
- index++;
- b = Ints.toByteArray(index);
- }
- storage.flush();
- logger.debug(" end time {}", System.currentTimeMillis());
- logger.debug(" start time for retrieve {}", System.currentTimeMillis());
- b = storage.retrieve(new byte[8]);
- int org_index = index;
- index = 10000;
- match(b, index);
- while (true) {
- index++;
- b = storage.retrieveNext();
- if (b == null) {
- logger.debug(" end time for retrieve {}/{}/{}", System.currentTimeMillis(), index, org_index);
- return;
- } else {
- if (!match(b, index)) {
- throw new RuntimeException("failed : " + index);
- }
- }
- }
-
- }
-
- public static boolean match(byte[] data, int match)
- {
- byte[] tempData = new byte[data.length - 8];
- System.arraycopy(data, 8, tempData, 0, tempData.length);
- int dataR = Ints.fromByteArray(tempData);
- //logger.debug("input: {}, output: {}",match,dataR);
- if (match == dataR) {
- return true;
- }
- return false;
- }
-
- private static final Logger logger = LoggerFactory.getLogger(HDFSStorageMatching.class);
-}
-
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/java/com/datatorrent/flume/storage/HDFSStoragePerformance.java
----------------------------------------------------------------------
diff --git a/flume/src/test/java/com/datatorrent/flume/storage/HDFSStoragePerformance.java b/flume/src/test/java/com/datatorrent/flume/storage/HDFSStoragePerformance.java
deleted file mode 100644
index 098f3f7..0000000
--- a/flume/src/test/java/com/datatorrent/flume/storage/HDFSStoragePerformance.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.flume.storage;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.datatorrent.netlet.util.Slice;
-
-/**
- *
- */
-public class HDFSStoragePerformance
-{
-
- public static void main(String[] args)
- {
- HDFSStorage storage = new HDFSStorage();
- storage.setBaseDir(".");
- storage.setId("gaurav_flume_1");
- storage.setRestore(true);
- storage.setup(null);
- int count = 1000000;
-
- logger.debug(" start time {}", System.currentTimeMillis());
- int index = 10000;
- byte[] b = new byte[1024];
- for (int i = 0; i < count; i++) {
- storage.store(new Slice(b, 0, b.length));
- }
- storage.flush();
- for (int i = 0; i < count; i++) {
- storage.store(new Slice(b, 0, b.length));
- }
- storage.flush();
- for (int i = 0; i < count; i++) {
- storage.store(new Slice(b, 0, b.length));
- }
- storage.flush();
- logger.debug(" end time {}", System.currentTimeMillis());
- logger.debug(" start time for retrieve {}", System.currentTimeMillis());
- storage.retrieve(new byte[8]);
- String inputData = new String(b);
- index = 1;
- while (true) {
- b = storage.retrieveNext();
- if (b == null) {
- logger.debug(" end time for retrieve {}", System.currentTimeMillis());
- return;
- } else {
- if (!match(b, inputData)) {
- throw new RuntimeException("failed : " + index);
- }
- }
-
- index++;
- }
-
- }
-
- public static boolean match(byte[] data, String match)
- {
- byte[] tempData = new byte[data.length - 8];
- System.arraycopy(data, 8, tempData, 0, tempData.length);
-// logger.debug("input: {}, output: {}",match,new String(tempData));
- return (match.equals(new String(tempData)));
- }
-
- private static final Logger logger = LoggerFactory.getLogger(HDFSStoragePerformance.class);
-}
-