You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/05/18 20:42:05 UTC
[15/22] incubator-apex-malhar git commit: APEXMALHAR-2095 removed
checkstyle violations of malhar library module
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/io/fs/AbstractReconciler.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractReconciler.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractReconciler.java
index 4172ed4..945c000 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractReconciler.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractReconciler.java
@@ -32,14 +32,13 @@ import org.slf4j.LoggerFactory;
import com.google.common.collect.Maps;
import com.google.common.collect.Queues;
-import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.Operator.CheckpointListener;
import com.datatorrent.api.Operator.IdleTimeHandler;
-
-import com.datatorrent.netlet.util.DTThrowable;
+import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.common.util.NameableThreadFactory;
+import com.datatorrent.netlet.util.DTThrowable;
/**
* This base operator queues input tuples for each window and asynchronously processes them after the window is committed.
@@ -115,12 +114,10 @@ public abstract class AbstractReconciler<INPUT, QUEUETUPLE> extends BaseOperator
if (execute) {
try {
Thread.sleep(spinningTime);
- }
- catch (InterruptedException ie) {
+ } catch (InterruptedException ie) {
throw new RuntimeException(ie);
}
- }
- else {
+ } else {
logger.error("Exception: ", cause);
DTThrowable.rethrow(cause.get());
}
@@ -178,14 +175,14 @@ public abstract class AbstractReconciler<INPUT, QUEUETUPLE> extends BaseOperator
processCommittedData(output);
doneTuples.add(output);
}
- }
- catch (Throwable e) {
+ } catch (Throwable e) {
cause.set(e);
execute = false;
}
}
};
}
+
/**
* The implementation class should call this method to enqueue output once input is converted to queue input.
*
@@ -203,7 +200,7 @@ public abstract class AbstractReconciler<INPUT, QUEUETUPLE> extends BaseOperator
*
* @param input
*/
- abstract protected void processTuple(INPUT input);
+ protected abstract void processTuple(INPUT input);
/**
* This method is called once the window in which queueTuple was created is committed.
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/io/fs/AbstractThroughputFileInputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractThroughputFileInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractThroughputFileInputOperator.java
index a9604e3..0a96418 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractThroughputFileInputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractThroughputFileInputOperator.java
@@ -18,15 +18,16 @@
*/
package com.datatorrent.lib.io.fs;
-import com.datatorrent.api.Stats.OperatorStats;
-import com.datatorrent.lib.counters.BasicCounters;
-
import java.util.Collection;
-import org.apache.commons.lang.mutable.MutableLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.commons.lang.mutable.MutableLong;
+
+import com.datatorrent.api.Stats.OperatorStats;
+import com.datatorrent.lib.counters.BasicCounters;
+
/**
* This is the base implementation for a file input operator, which scans a directory for files.
* Files are then read and split into tuples, which are emitted.
@@ -134,7 +135,7 @@ public abstract class AbstractThroughputFileInputOperator<T> extends AbstractFil
int newOperatorCount;
int totalFileCount = 0;
- for(Partition<AbstractFileInputOperator<T>> partition : partitions) {
+ for (Partition<AbstractFileInputOperator<T>> partition : partitions) {
AbstractFileInputOperator<T> oper = partition.getPartitionedInstance();
totalFileCount += oper.failedFiles.size();
totalFileCount += oper.pendingFiles.size();
@@ -145,11 +146,10 @@ public abstract class AbstractThroughputFileInputOperator<T> extends AbstractFil
}
}
- if(!isInitialParitition) {
+ if (!isInitialParitition) {
LOG.debug("definePartitions: Total File Count: {}", totalFileCount);
newOperatorCount = computeOperatorCount(totalFileCount);
- }
- else {
+ } else {
newOperatorCount = partitionCount;
}
@@ -160,13 +160,13 @@ public abstract class AbstractThroughputFileInputOperator<T> extends AbstractFil
{
int newOperatorCount = totalFileCount / preferredMaxPendingFilesPerOperator;
- if(totalFileCount % preferredMaxPendingFilesPerOperator > 0) {
+ if (totalFileCount % preferredMaxPendingFilesPerOperator > 0) {
newOperatorCount++;
}
- if(newOperatorCount > partitionCount) {
+ if (newOperatorCount > partitionCount) {
newOperatorCount = partitionCount;
}
- if(newOperatorCount == 0) {
+ if (newOperatorCount == 0) {
newOperatorCount = 1;
}
@@ -179,17 +179,17 @@ public abstract class AbstractThroughputFileInputOperator<T> extends AbstractFil
{
BasicCounters<MutableLong> fileCounters = null;
- for(OperatorStats operatorStats: batchedOperatorStats.getLastWindowedStats()) {
- if(operatorStats.counters != null) {
- fileCounters = (BasicCounters<MutableLong>) operatorStats.counters;
+ for (OperatorStats operatorStats : batchedOperatorStats.getLastWindowedStats()) {
+ if (operatorStats.counters != null) {
+ fileCounters = (BasicCounters<MutableLong>)operatorStats.counters;
}
}
Response response = new Response();
- if(fileCounters != null &&
- fileCounters.getCounter(FileCounters.PENDING_FILES).longValue() > 0L ||
- System.currentTimeMillis() - repartitionInterval <= lastRepartition) {
+ if (fileCounters != null &&
+ fileCounters.getCounter(FileCounters.PENDING_FILES).longValue() > 0L ||
+ System.currentTimeMillis() - repartitionInterval <= lastRepartition) {
response.repartitionRequired = false;
return response;
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitter.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitter.java b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitter.java
index 48d4ae6..69e44a5 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitter.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitter.java
@@ -21,7 +21,11 @@ package com.datatorrent.lib.io.fs;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
-import java.util.*;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
@@ -33,13 +37,14 @@ import javax.annotation.Nullable;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.commons.lang.mutable.MutableLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
@@ -49,13 +54,16 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
-import com.datatorrent.api.*;
+import com.datatorrent.api.Component;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.Operator;
import com.datatorrent.api.annotation.OperatorAnnotation;
-
-import com.datatorrent.netlet.util.DTThrowable;
import com.datatorrent.lib.counters.BasicCounters;
import com.datatorrent.lib.io.IdempotentStorageManager;
import com.datatorrent.lib.io.block.BlockMetadata.FileBlockMetadata;
+import com.datatorrent.netlet.util.DTThrowable;
/**
* Input operator that scans a directory for files and splits a file into blocks.<br/>
@@ -129,8 +137,7 @@ public class FileSplitter implements InputOperator, Operator.CheckpointListener
try {
fs = scanner.getFSInstance();
- }
- catch (IOException e) {
+ } catch (IOException e) {
throw new RuntimeException("creating fs", e);
}
@@ -138,10 +145,10 @@ public class FileSplitter implements InputOperator, Operator.CheckpointListener
blockSize = fs.getDefaultBlockSize(new Path(scanner.files.iterator().next()));
}
- if (context.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID) < idempotentStorageManager.getLargestRecoveryWindow()) {
+ if (context.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID) <
+ idempotentStorageManager.getLargestRecoveryWindow()) {
blockMetadataIterator = null;
- }
- else {
+ } else {
//don't setup scanner while recovery
scanner.setup(context);
}
@@ -153,15 +160,12 @@ public class FileSplitter implements InputOperator, Operator.CheckpointListener
{
try {
scanner.teardown();
- }
- catch (Throwable t) {
+ } catch (Throwable t) {
DTThrowable.rethrow(t);
- }
- finally {
+ } finally {
try {
fs.close();
- }
- catch (IOException e) {
+ } catch (IOException e) {
throw new RuntimeException(e);
}
}
@@ -181,8 +185,7 @@ public class FileSplitter implements InputOperator, Operator.CheckpointListener
{
try {
@SuppressWarnings("unchecked")
- LinkedList<FileInfo> recoveredData = (LinkedList<FileInfo>) idempotentStorageManager.load(operatorId,
- windowId);
+ LinkedList<FileInfo> recoveredData = (LinkedList<FileInfo>)idempotentStorageManager.load(operatorId, windowId);
if (recoveredData == null) {
//This could happen when there are multiple physical instances and one of them is ahead in processing windows.
return;
@@ -193,8 +196,7 @@ public class FileSplitter implements InputOperator, Operator.CheckpointListener
for (FileInfo info : recoveredData) {
if (info.directoryPath != null) {
scanner.lastModifiedTimes.put(info.directoryPath, info.modifiedTime);
- }
- else { //no directory
+ } else { //no directory
scanner.lastModifiedTimes.put(info.relativeFilePath, info.modifiedTime);
}
@@ -211,8 +213,7 @@ public class FileSplitter implements InputOperator, Operator.CheckpointListener
if (windowId == idempotentStorageManager.getLargestRecoveryWindow()) {
scanner.setup(context);
}
- }
- catch (IOException e) {
+ } catch (IOException e) {
throw new RuntimeException("replay", e);
}
}
@@ -250,8 +251,7 @@ public class FileSplitter implements InputOperator, Operator.CheckpointListener
if (fileInfo.lastFileOfScan) {
break;
}
- }
- catch (IOException e) {
+ } catch (IOException e) {
throw new RuntimeException("creating metadata", e);
}
}
@@ -263,8 +263,7 @@ public class FileSplitter implements InputOperator, Operator.CheckpointListener
if (currentWindowId > idempotentStorageManager.getLargestRecoveryWindow()) {
try {
idempotentStorageManager.save(currentWindowRecoveryState, operatorId, currentWindowId);
- }
- catch (IOException e) {
+ } catch (IOException e) {
throw new RuntimeException("saving recovery", e);
}
}
@@ -280,8 +279,7 @@ public class FileSplitter implements InputOperator, Operator.CheckpointListener
while (blockMetadataIterator.hasNext()) {
if (blockCount++ < blocksThreshold) {
this.blocksMetadataOutput.emit(blockMetadataIterator.next());
- }
- else {
+ } else {
return false;
}
}
@@ -293,7 +291,7 @@ public class FileSplitter implements InputOperator, Operator.CheckpointListener
* Can be overridden for creating block metadata of a type that extends {@link FileBlockMetadata}
*/
protected FileBlockMetadata createBlockMetadata(long pos, long lengthOfFileInBlock, int blockNumber,
- FileMetadata fileMetadata, boolean isLast)
+ FileMetadata fileMetadata, boolean isLast)
{
return new FileBlockMetadata(fileMetadata.getFilePath(), fileMetadata.getBlockIds()[blockNumber - 1], pos,
lengthOfFileInBlock, isLast, blockNumber == 1 ? -1 : fileMetadata.getBlockIds()[blockNumber - 2]);
@@ -321,7 +319,7 @@ public class FileSplitter implements InputOperator, Operator.CheckpointListener
fileMetadata.setFileLength(status.getLen());
if (!status.isDirectory()) {
- int noOfBlocks = (int) ((status.getLen() / blockSize) + (((status.getLen() % blockSize) == 0) ? 0 : 1));
+ int noOfBlocks = (int)((status.getLen() / blockSize) + (((status.getLen() % blockSize) == 0) ? 0 : 1));
if (fileMetadata.getDataOffset() >= status.getLen()) {
noOfBlocks = 0;
}
@@ -335,7 +333,7 @@ public class FileSplitter implements InputOperator, Operator.CheckpointListener
{
// block ids are 32 bits of operatorId | 32 bits of sequence number
long[] blockIds = new long[fileMetadata.getNumberOfBlocks()];
- long longLeftSide = ((long) operatorId) << 32;
+ long longLeftSide = ((long)operatorId) << 32;
for (int i = 0; i < fileMetadata.getNumberOfBlocks(); i++) {
blockIds[i] = longLeftSide | sequenceNo++ & 0xFFFFFFFFL;
}
@@ -392,8 +390,7 @@ public class FileSplitter implements InputOperator, Operator.CheckpointListener
{
try {
idempotentStorageManager.deleteUpTo(operatorId, l);
- }
- catch (IOException e) {
+ } catch (IOException e) {
throw new RuntimeException(e);
}
}
@@ -671,8 +668,7 @@ public class FileSplitter implements InputOperator, Operator.CheckpointListener
}
try {
fs = getFSInstance();
- }
- catch (IOException e) {
+ } catch (IOException e) {
throw new RuntimeException("opening fs", e);
}
scanService.submit(this);
@@ -685,8 +681,7 @@ public class FileSplitter implements InputOperator, Operator.CheckpointListener
scanService.shutdownNow();
try {
fs.close();
- }
- catch (IOException e) {
+ } catch (IOException e) {
throw new RuntimeException("closing fs", e);
}
}
@@ -708,13 +703,11 @@ public class FileSplitter implements InputOperator, Operator.CheckpointListener
scan(new Path(afile), null);
}
scanComplete();
- }
- else {
+ } else {
Thread.sleep(sleepMillis);
}
}
- }
- catch (Throwable throwable) {
+ } catch (Throwable throwable) {
LOG.error("service", throwable);
running = false;
atomicThrowable.set(throwable);
@@ -776,29 +769,25 @@ public class FileSplitter implements InputOperator, Operator.CheckpointListener
LOG.debug("found {}", childPathStr);
FileInfo info;
- if(rootPath == null) {
- info =parentStatus.isDirectory() ?
- new FileInfo(parentPathStr, childPath.getName(), parentStatus.getModificationTime()) :
- new FileInfo(null, childPathStr, parentStatus.getModificationTime());
- }
- else {
+ if (rootPath == null) {
+ info = parentStatus.isDirectory() ?
+ new FileInfo(parentPathStr, childPath.getName(), parentStatus.getModificationTime()) :
+ new FileInfo(null, childPathStr, parentStatus.getModificationTime());
+ } else {
URI relativeChildURI = rootPath.toUri().relativize(childPath.toUri());
info = new FileInfo(rootPath.toUri().getPath(), relativeChildURI.getPath(),
- parentStatus.getModificationTime());
+ parentStatus.getModificationTime());
}
discoveredFiles.add(info);
- }
- else {
+ } else {
// don't look at it again
ignoredFiles.add(childPathStr);
}
}
- }
- catch (FileNotFoundException fnf) {
+ } catch (FileNotFoundException fnf) {
LOG.warn("Failed to list directory {}", filePath, fnf);
- }
- catch (IOException e) {
+ } catch (IOException e) {
throw new RuntimeException("listing files", e);
}
}
@@ -813,7 +802,7 @@ public class FileSplitter implements InputOperator, Operator.CheckpointListener
* @throws IOException
*/
protected boolean skipFile(@SuppressWarnings("unused") @NotNull Path path, @NotNull Long modificationTime,
- Long lastModificationTime) throws IOException
+ Long lastModificationTime) throws IOException
{
return (!(lastModificationTime == null || modificationTime > lastModificationTime));
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/io/fs/FilterStreamContext.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/FilterStreamContext.java b/library/src/main/java/com/datatorrent/lib/io/fs/FilterStreamContext.java
index 595abde..35530a3 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/FilterStreamContext.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/FilterStreamContext.java
@@ -44,7 +44,7 @@ public interface FilterStreamContext<F extends FilterOutputStream>
* Base filter context that can be extended to build custom filters.
* @param <F> The Filter output stream
*/
- public static abstract class BaseFilterStreamContext<F extends FilterOutputStream> implements FilterStreamContext<F>
+ abstract class BaseFilterStreamContext<F extends FilterOutputStream> implements FilterStreamContext<F>
{
protected transient F filterStream;
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/io/fs/FilterStreamProvider.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/FilterStreamProvider.java b/library/src/main/java/com/datatorrent/lib/io/fs/FilterStreamProvider.java
index 58b51af..75e6e5f 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/FilterStreamProvider.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/FilterStreamProvider.java
@@ -21,7 +21,12 @@ package com.datatorrent.lib.io.fs;
import java.io.FilterOutputStream;
import java.io.IOException;
import java.io.OutputStream;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
import com.google.common.collect.Maps;
@@ -37,7 +42,7 @@ public interface FilterStreamProvider<F extends FilterOutputStream, S extends Ou
public void reclaimFilterStreamContext(FilterStreamContext<F> filterStreamContext);
- public static abstract class SimpleFilterReusableStreamProvider<F extends FilterOutputStream, S extends OutputStream> implements FilterStreamProvider<F, S>
+ abstract class SimpleFilterReusableStreamProvider<F extends FilterOutputStream, S extends OutputStream> implements FilterStreamProvider<F, S>
{
private transient Map<OutputStream, FilterStreamContext<F>> reusableContexts = Maps.newHashMap();
@@ -112,7 +117,9 @@ public interface FilterStreamProvider<F extends FilterOutputStream, S extends Ou
}
}
- private class FilterChainStreamContext extends FilterStreamContext.BaseFilterStreamContext implements FilterStreamContext {
+ private class FilterChainStreamContext extends FilterStreamContext.BaseFilterStreamContext
+ implements FilterStreamContext
+ {
private List<FilterStreamContext<?>> streamContexts = new ArrayList<FilterStreamContext<?>>();
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/io/fs/TailFsInputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/TailFsInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/fs/TailFsInputOperator.java
index 4ac03a6..f2e9a8c 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/TailFsInputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/TailFsInputOperator.java
@@ -22,10 +22,11 @@ import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
-import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.commons.io.FileUtils;
+
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.InputOperator;
@@ -261,6 +262,7 @@ public class TailFsInputOperator implements InputOperator, ActivationListener<Op
try {
Thread.sleep(delay);
} catch (InterruptedException e) {
+ //swallowing exception
}
--localCounter;
}
@@ -286,7 +288,7 @@ public class TailFsInputOperator implements InputOperator, ActivationListener<Op
}
accessTime = System.currentTimeMillis();
while ((ch = reader.read()) != -1) {
- readChar = (char) ch;
+ readChar = (char)ch;
if (readChar != delimiter) {
sb.append(readChar);
} else {
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/io/fs/package-info.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/package-info.java b/library/src/main/java/com/datatorrent/lib/io/fs/package-info.java
index 2009d61..872a618 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/package-info.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/package-info.java
@@ -20,4 +20,4 @@
* Library of input operators for writing into file streams and output operators for reading from file streams.
* The file I/O operators interact with entities outside of DAG, and at times outside of Hadoop
*/
-package com.datatorrent.lib.io.fs;
\ No newline at end of file
+package com.datatorrent.lib.io.fs;
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSOutputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSOutputOperator.java b/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSOutputOperator.java
index 617b397..bac0816 100644
--- a/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSOutputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSOutputOperator.java
@@ -18,18 +18,22 @@
*/
package com.datatorrent.lib.io.jms;
-import com.datatorrent.api.Context.OperatorContext;
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.Operator;
-import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.List;
+
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.collect.Lists;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.Operator;
+
/**
* This is the base implementation of an JMS output operator.
* A concrete operator should be created from this skeleton implementation.
@@ -93,8 +97,7 @@ public abstract class AbstractJMSOutputOperator extends JMSBase implements Opera
try {
createConnection();
- }
- catch (JMSException ex) {
+ } catch (JMSException ex) {
logger.debug(ex.getLocalizedMessage());
throw new RuntimeException(ex);
}
@@ -103,8 +106,7 @@ public abstract class AbstractJMSOutputOperator extends JMSBase implements Opera
try {
store.connect();
- }
- catch (IOException ex) {
+ } catch (IOException ex) {
throw new RuntimeException(ex);
}
@@ -112,7 +114,7 @@ public abstract class AbstractJMSOutputOperator extends JMSBase implements Opera
mode = context.getValue(OperatorContext.PROCESSING_MODE);
- if(mode==ProcessingMode.AT_MOST_ONCE){
+ if (mode == ProcessingMode.AT_MOST_ONCE) {
//Batch must be cleared to avoid writing same data twice
tupleBatch.clear();
}
@@ -135,8 +137,7 @@ public abstract class AbstractJMSOutputOperator extends JMSBase implements Opera
logger.debug("beginning teardown");
try {
store.disconnect();
- }
- catch (IOException ex) {
+ } catch (IOException ex) {
throw new RuntimeException(ex);
}
@@ -161,7 +162,7 @@ public abstract class AbstractJMSOutputOperator extends JMSBase implements Opera
{
logger.debug("Ending window {}", currentWindowId);
- if(store.isExactlyOnce()) {
+ if (store.isExactlyOnce()) {
//Store committed window and data in same transaction
if (committedWindowId < currentWindowId) {
store.storeCommittedWindowId(appId, operatorId, currentWindowId);
@@ -170,8 +171,7 @@ public abstract class AbstractJMSOutputOperator extends JMSBase implements Opera
flushBatch();
store.commitTransaction();
- }
- else {
+ } else {
//For transactionable stores which cannot support exactly once, At least
//once can be insured by for storing the data and then the committed window
//id.
@@ -194,11 +194,10 @@ public abstract class AbstractJMSOutputOperator extends JMSBase implements Opera
{
logger.debug("flushing batch, batch size {}", tupleBatch.size());
- for (Message message: messageBatch) {
+ for (Message message : messageBatch) {
try {
producer.send(message);
- }
- catch (JMSException ex) {
+ } catch (JMSException ex) {
throw new RuntimeException(ex);
}
}
@@ -215,7 +214,7 @@ public abstract class AbstractJMSOutputOperator extends JMSBase implements Opera
*/
protected void sendMessage(Object data)
{
- if(currentWindowId <= committedWindowId) {
+ if (currentWindowId <= committedWindowId) {
return;
}
@@ -249,8 +248,7 @@ public abstract class AbstractJMSOutputOperator extends JMSBase implements Opera
producer = null;
super.cleanup();
- }
- catch (JMSException ex) {
+ } catch (JMSException ex) {
logger.error(null, ex);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSSinglePortOutputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSSinglePortOutputOperator.java b/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSSinglePortOutputOperator.java
index c7ffed3..efda6b0 100644
--- a/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSSinglePortOutputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSSinglePortOutputOperator.java
@@ -18,10 +18,11 @@
*/
package com.datatorrent.lib.io.jms;
-import com.datatorrent.api.DefaultInputPort;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.datatorrent.api.DefaultInputPort;
+
/**
* This is the base implementation of a single port JMS output operator.
* A concrete operator should be created from this skeleton implementation.
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/io/jms/FSPsuedoTransactionableStore.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/jms/FSPsuedoTransactionableStore.java b/library/src/main/java/com/datatorrent/lib/io/jms/FSPsuedoTransactionableStore.java
index 61f1eb7..31eaf18 100644
--- a/library/src/main/java/com/datatorrent/lib/io/jms/FSPsuedoTransactionableStore.java
+++ b/library/src/main/java/com/datatorrent/lib/io/jms/FSPsuedoTransactionableStore.java
@@ -18,15 +18,22 @@
*/
package com.datatorrent.lib.io.jms;
-import com.datatorrent.api.annotation.Stateless;
import java.io.IOException;
+
import javax.jms.JMSException;
import javax.validation.constraints.NotNull;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.*;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.datatorrent.api.annotation.Stateless;
+
/**
* This is a JMS store which stores committed window ids in a file. This is not a true
* transactionable store because there is a chance that a failure may occur in between storing the
@@ -85,9 +92,8 @@ public class FSPsuedoTransactionableStore extends JMSBaseTransactionableStore
{
FileSystem tempFS = FileSystem.newInstance(new Path(recoveryDirectory).toUri(), new Configuration());
- if(tempFS instanceof LocalFileSystem)
- {
- tempFS = ((LocalFileSystem) tempFS).getRaw();
+ if (tempFS instanceof LocalFileSystem) {
+ tempFS = ((LocalFileSystem)tempFS).getRaw();
}
return tempFS;
@@ -118,12 +124,10 @@ public class FSPsuedoTransactionableStore extends JMSBaseTransactionableStore
try {
//No committed window stored, return negative invalid window.
- if(!fs.exists(recoveryPath))
- {
+ if (!fs.exists(recoveryPath)) {
return Stateless.WINDOW_ID;
}
- }
- catch (IOException ex) {
+ } catch (IOException ex) {
throw new RuntimeException(ex);
}
@@ -132,16 +136,15 @@ public class FSPsuedoTransactionableStore extends JMSBaseTransactionableStore
try {
FileStatus[] windowFiles = fs.listStatus(recoveryPath);
- for(FileStatus fileStatus: windowFiles) {
+ for (FileStatus fileStatus : windowFiles) {
String windowString = fileStatus.getPath().getName();
long tempWindow = Long.parseLong(windowString);
- if(maxWindow < tempWindow) {
+ if (maxWindow < tempWindow) {
maxWindow = tempWindow;
}
}
- }
- catch (IOException ex) {
+ } catch (IOException ex) {
throw new RuntimeException(ex);
}
@@ -159,14 +162,13 @@ public class FSPsuedoTransactionableStore extends JMSBaseTransactionableStore
fs.create(windowPath);
FileStatus[] windowFiles = fs.listStatus(recoveryPath);
- for(FileStatus fileStatus: windowFiles) {
+ for (FileStatus fileStatus : windowFiles) {
Path tempPath = fileStatus.getPath();
- if(!tempPath.getName().equals(windowString)) {
+ if (!tempPath.getName().equals(windowString)) {
fs.delete(tempPath, true);
}
}
- }
- catch (IOException ex) {
+ } catch (IOException ex) {
throw new RuntimeException(ex);
}
}
@@ -175,10 +177,8 @@ public class FSPsuedoTransactionableStore extends JMSBaseTransactionableStore
public void removeCommittedWindowId(String appId, int operatorId)
{
try {
- fs.delete(getOperatorRecoveryPath(appId, operatorId).getParent(),
- true);
- }
- catch (IOException ex) {
+ fs.delete(getOperatorRecoveryPath(appId, operatorId).getParent(), true);
+ } catch (IOException ex) {
throw new RuntimeException(ex);
}
}
@@ -194,8 +194,7 @@ public class FSPsuedoTransactionableStore extends JMSBaseTransactionableStore
{
try {
this.getBase().getSession().commit();
- }
- catch (JMSException ex) {
+ } catch (JMSException ex) {
throw new RuntimeException(ex);
}
@@ -207,8 +206,7 @@ public class FSPsuedoTransactionableStore extends JMSBaseTransactionableStore
{
try {
this.getBase().getSession().rollback();
- }
- catch (JMSException ex) {
+ } catch (JMSException ex) {
throw new RuntimeException(ex);
}
@@ -249,13 +247,9 @@ public class FSPsuedoTransactionableStore extends JMSBaseTransactionableStore
return false;
}
- private Path getOperatorRecoveryPath(String appId,
- int operatorId)
+ private Path getOperatorRecoveryPath(String appId, int operatorId)
{
- return new Path(DEFAULT_RECOVERY_DIRECTORY + "/" +
- appId + "/" +
- operatorId + "/" +
- COMMITTED_WINDOW_DIR);
+ return new Path(DEFAULT_RECOVERY_DIRECTORY + "/" + appId + "/" + operatorId + "/" + COMMITTED_WINDOW_DIR);
}
/**
@@ -265,14 +259,9 @@ public class FSPsuedoTransactionableStore extends JMSBaseTransactionableStore
* @param windowId The id of the current window.
* @return The path where the windowId is stored.
*/
- private Path getOperatorWindowRecoveryPath(String appId,
- int operatorId,
- long windowId)
+ private Path getOperatorWindowRecoveryPath(String appId, int operatorId, long windowId)
{
- return new Path(DEFAULT_RECOVERY_DIRECTORY + "/" +
- appId + "/" +
- operatorId + "/" +
- COMMITTED_WINDOW_DIR + "/" +
- windowId);
+ return new Path(DEFAULT_RECOVERY_DIRECTORY + "/" + appId + "/" + operatorId + "/" + COMMITTED_WINDOW_DIR + "/" +
+ windowId);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/io/jms/JMSBase.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/jms/JMSBase.java b/library/src/main/java/com/datatorrent/lib/io/jms/JMSBase.java
index 6db6a4d..48ed2c3 100644
--- a/library/src/main/java/com/datatorrent/lib/io/jms/JMSBase.java
+++ b/library/src/main/java/com/datatorrent/lib/io/jms/JMSBase.java
@@ -315,17 +315,13 @@ public class JMSBase
{
if ("CLIENT_ACKNOWLEDGE".equals(ackMode)) {
return Session.CLIENT_ACKNOWLEDGE;
- }
- else if ("AUTO_ACKNOWLEDGE".equals(ackMode)) {
+ } else if ("AUTO_ACKNOWLEDGE".equals(ackMode)) {
return Session.AUTO_ACKNOWLEDGE;
- }
- else if ("DUPS_OK_ACKNOWLEDGE".equals(ackMode)) {
+ } else if ("DUPS_OK_ACKNOWLEDGE".equals(ackMode)) {
return Session.DUPS_OK_ACKNOWLEDGE;
- }
- else if ("SESSION_TRANSACTED".equals(ackMode)) {
+ } else if ("SESSION_TRANSACTED".equals(ackMode)) {
return Session.SESSION_TRANSACTED;
- }
- else {
+ } else {
return Session.CLIENT_ACKNOWLEDGE; // default
}
}
@@ -372,8 +368,7 @@ public class JMSBase
BeanUtils.populate(cf, connectionFactoryProperties);
logger.debug("creation successful.");
return cf;
- }
- catch (Exception e) {
+ } catch (Exception e) {
throw new RuntimeException("Failed to create connection factory.", e);
}
}
@@ -388,8 +383,7 @@ public class JMSBase
connection.close();
session = null;
connection = null;
- }
- catch (JMSException ex) {
+ } catch (JMSException ex) {
logger.debug(ex.getLocalizedMessage());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/io/jms/JMSMultiPortOutputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/jms/JMSMultiPortOutputOperator.java b/library/src/main/java/com/datatorrent/lib/io/jms/JMSMultiPortOutputOperator.java
index 9caa833..3bb8cb9 100644
--- a/library/src/main/java/com/datatorrent/lib/io/jms/JMSMultiPortOutputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/jms/JMSMultiPortOutputOperator.java
@@ -18,14 +18,20 @@
*/
package com.datatorrent.lib.io.jms;
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.annotation.InputPortFieldAnnotation;
import java.io.Serializable;
import java.util.Map;
-import javax.jms.*;
+
+import javax.jms.BytesMessage;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+
/**
* @since 2.1.0
*/
@@ -108,28 +114,22 @@ public class JMSMultiPortOutputOperator extends AbstractJMSOutputOperator
try {
if (tuple instanceof Message) {
return (Message)tuple;
- }
- else if (tuple instanceof String) {
+ } else if (tuple instanceof String) {
return getSession().createTextMessage((String)tuple);
- }
- else if (tuple instanceof byte[]) {
+ } else if (tuple instanceof byte[]) {
BytesMessage message = getSession().createBytesMessage();
message.writeBytes((byte[])tuple);
return message;
- }
- else if (tuple instanceof Map) {
+ } else if (tuple instanceof Map) {
return createMessageForMap((Map)tuple);
- }
- else if (tuple instanceof Serializable) {
+ } else if (tuple instanceof Serializable) {
return getSession().createObjectMessage((Serializable)tuple);
- }
- else {
+ } else {
throw new RuntimeException("Cannot convert object of type "
- + tuple.getClass() + "] to JMS message. Supported message "
- + "payloads are: String, byte array, Map<String,?>, Serializable object.");
+ + tuple.getClass() + "] to JMS message. Supported message "
+ + "payloads are: String, byte array, Map<String,?>, Serializable object.");
}
- }
- catch (JMSException ex) {
+ } catch (JMSException ex) {
logger.error(ex.getLocalizedMessage());
throw new RuntimeException(ex);
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/io/jms/JMSObjectInputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/jms/JMSObjectInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/jms/JMSObjectInputOperator.java
index aa68802..0bc0c79 100644
--- a/library/src/main/java/com/datatorrent/lib/io/jms/JMSObjectInputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/jms/JMSObjectInputOperator.java
@@ -18,15 +18,24 @@
*/
package com.datatorrent.lib.io.jms;
-import com.datatorrent.api.DefaultOutputPort;
import java.io.Serializable;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;
-import javax.jms.*;
+
+import javax.jms.BytesMessage;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.ObjectMessage;
+import javax.jms.StreamMessage;
+import javax.jms.TextMessage;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.datatorrent.api.DefaultOutputPort;
+
/**
* An implementation of AbstractJMSInputOperator which emits TextMessage,StreamMessage,BytesMessage,MapMessage
* and ObjectMessage on their respective ports.
@@ -54,20 +63,15 @@ public class JMSObjectInputOperator extends AbstractJMSInputOperator<Object>
{
if (message instanceof TextMessage) {
return ((TextMessage)message).getText();
- }
- else if (message instanceof StreamMessage) {
+ } else if (message instanceof StreamMessage) {
return ((StreamMessage)message).readString();
- }
- else if (message instanceof BytesMessage) {
+ } else if (message instanceof BytesMessage) {
return extractByteArrayFromMessage((BytesMessage)message);
- }
- else if (message instanceof MapMessage) {
+ } else if (message instanceof MapMessage) {
return extractMapFromMessage((MapMessage)message);
- }
- else if (message instanceof ObjectMessage) {
+ } else if (message instanceof ObjectMessage) {
return extractSerializableFromMessage((ObjectMessage)message);
- }
- else {
+ } else {
return message;
}
}
@@ -122,19 +126,16 @@ public class JMSObjectInputOperator extends AbstractJMSInputOperator<Object>
{
if (outputString.isConnected()) {
outputString.emit((String)payload);
- }
- else if (outputMap.isConnected()) {
+ } else if (outputMap.isConnected()) {
outputMap.emit((Map<String, Object>)payload);
- }
- else if (outputBytes.isConnected()) {
+ } else if (outputBytes.isConnected()) {
outputBytes.emit((byte[])payload);
- }
- else {
+ } else {
output.emit(payload);
}
}
@SuppressWarnings("unused")
- private static transient final Logger logger = LoggerFactory.getLogger(JMSObjectInputOperator.class);
+ private static final transient Logger logger = LoggerFactory.getLogger(JMSObjectInputOperator.class);
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/io/jms/JMSTransactionableStore.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/jms/JMSTransactionableStore.java b/library/src/main/java/com/datatorrent/lib/io/jms/JMSTransactionableStore.java
index 4c5c265..11b8447 100644
--- a/library/src/main/java/com/datatorrent/lib/io/jms/JMSTransactionableStore.java
+++ b/library/src/main/java/com/datatorrent/lib/io/jms/JMSTransactionableStore.java
@@ -20,7 +20,14 @@ package com.datatorrent.lib.io.jms;
import java.io.IOException;
import java.util.Enumeration;
-import javax.jms.*;
+
+import javax.jms.BytesMessage;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -60,7 +67,7 @@ public class JMSTransactionableStore extends JMSBaseTransactionableStore
try {
beginTransaction();
- BytesMessage message = (BytesMessage) consumer.receive();
+ BytesMessage message = (BytesMessage)consumer.receive();
logger.debug("Retrieved committed window message id {}", message.getJMSMessageID());
long windowId = message.readLong();
@@ -71,8 +78,7 @@ public class JMSTransactionableStore extends JMSBaseTransactionableStore
logger.debug("Retrieved windowId {}", windowId);
return windowId;
- }
- catch (JMSException ex) {
+ } catch (JMSException ex) {
throw new RuntimeException(ex);
}
}
@@ -80,20 +86,19 @@ public class JMSTransactionableStore extends JMSBaseTransactionableStore
@Override
public void storeCommittedWindowId(String appId, int operatorId, long windowId)
{
- if(!inTransaction) {
+ if (!inTransaction) {
throw new RuntimeException("This should be called while you are in an existing transaction");
}
logger.debug("storing window appId {} operatorId {} windowId {}",
- appId, operatorId, windowId);
+ appId, operatorId, windowId);
try {
removeCommittedWindowId(appId, operatorId);
BytesMessage bytesMessage = this.getBase().getSession().createBytesMessage();
bytesMessage.writeLong(windowId);
producer.send(bytesMessage);
logger.debug("Retrieved committed window message id {}", bytesMessage.getJMSMessageID());
- }
- catch (JMSException ex) {
+ } catch (JMSException ex) {
throw new RuntimeException(ex);
}
}
@@ -103,8 +108,7 @@ public class JMSTransactionableStore extends JMSBaseTransactionableStore
{
try {
consumer.receive();
- }
- catch (JMSException ex) {
+ } catch (JMSException ex) {
throw new RuntimeException(ex);
}
}
@@ -114,8 +118,7 @@ public class JMSTransactionableStore extends JMSBaseTransactionableStore
{
logger.debug("beginning transaction");
- if(inTransaction)
- {
+ if (inTransaction) {
throw new RuntimeException("Cannot start a transaction twice.");
}
@@ -127,15 +130,13 @@ public class JMSTransactionableStore extends JMSBaseTransactionableStore
{
logger.debug("committing transaction.");
- if(!inTransaction)
- {
+ if (!inTransaction) {
throw new RuntimeException("Cannot commit a transaction if you are not in one.");
}
try {
getBase().getSession().commit();
- }
- catch (JMSException ex) {
+ } catch (JMSException ex) {
throw new RuntimeException(ex);
}
@@ -146,12 +147,9 @@ public class JMSTransactionableStore extends JMSBaseTransactionableStore
@Override
public void rollbackTransaction()
{
- try
- {
+ try {
getBase().getSession().rollback();
- }
- catch (JMSException ex)
- {
+ } catch (JMSException ex) {
throw new RuntimeException(ex);
}
}
@@ -168,12 +166,11 @@ public class JMSTransactionableStore extends JMSBaseTransactionableStore
logger.debug("Entering connect. is in transaction: {}", inTransaction);
try {
- String queueName = getQueueName(getAppId(),
- getOperatorId());
+ String queueName = getQueueName(getAppId(), getOperatorId());
logger.debug("Base is null: {}", getBase() == null);
- if(getBase() != null) {
+ if (getBase() != null) {
logger.debug("Session is null: {}", getBase().getSession() == null);
}
@@ -184,8 +181,7 @@ public class JMSTransactionableStore extends JMSBaseTransactionableStore
try {
Enumeration enumeration = browser.getEnumeration();
hasStore = enumeration.hasMoreElements();
- }
- catch (JMSException ex) {
+ } catch (JMSException ex) {
throw new RuntimeException(ex);
}
@@ -195,15 +191,14 @@ public class JMSTransactionableStore extends JMSBaseTransactionableStore
connected = true;
logger.debug("Connected. is in transaction: {}", inTransaction);
- if(!hasStore) {
+ if (!hasStore) {
beginTransaction();
BytesMessage message = getBase().getSession().createBytesMessage();
message.writeLong(-1L);
producer.send(message);
commitTransaction();
}
- }
- catch (JMSException ex) {
+ } catch (JMSException ex) {
throw new RuntimeException(ex);
}
@@ -217,8 +212,7 @@ public class JMSTransactionableStore extends JMSBaseTransactionableStore
try {
producer.close();
consumer.close();
- }
- catch (JMSException ex) {
+ } catch (JMSException ex) {
throw new RuntimeException(ex);
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/io/jms/package-info.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/jms/package-info.java b/library/src/main/java/com/datatorrent/lib/io/jms/package-info.java
index 49768c1..ad4f163 100644
--- a/library/src/main/java/com/datatorrent/lib/io/jms/package-info.java
+++ b/library/src/main/java/com/datatorrent/lib/io/jms/package-info.java
@@ -20,4 +20,4 @@
* Library of input operators for writing into jms broker and output operators for reading from jms broker.
* The jms operators interact with entities outside of DAG, and at times outside of Hadoop
*/
-package com.datatorrent.lib.io.jms;
\ No newline at end of file
+package com.datatorrent.lib.io.jms;
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/logs/ApacheLogParseMapOutputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/logs/ApacheLogParseMapOutputOperator.java b/library/src/main/java/com/datatorrent/lib/logs/ApacheLogParseMapOutputOperator.java
index 5883b98..33bdbaa 100644
--- a/library/src/main/java/com/datatorrent/lib/logs/ApacheLogParseMapOutputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/logs/ApacheLogParseMapOutputOperator.java
@@ -24,14 +24,16 @@ import java.util.HashMap;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-import com.datatorrent.common.util.BaseOperator;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.annotation.OperatorAnnotation;
import com.datatorrent.api.annotation.Stateless;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import com.datatorrent.common.util.BaseOperator;
/**
* This operator parses apache logs one line at a time (each tuple is a log line), using the given regex.
@@ -83,8 +85,7 @@ public class ApacheLogParseMapOutputOperator extends BaseOperator
{
try {
processTuple(s);
- }
- catch (ParseException ex) {
+ } catch (ParseException ex) {
throw new RuntimeException("Could not parse the input string", ex);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/logs/ApacheLogParseOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/logs/ApacheLogParseOperator.java b/library/src/main/java/com/datatorrent/lib/logs/ApacheLogParseOperator.java
index 938a927..f1dffe8 100644
--- a/library/src/main/java/com/datatorrent/lib/logs/ApacheLogParseOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/logs/ApacheLogParseOperator.java
@@ -22,11 +22,11 @@ import java.text.ParseException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.annotation.OperatorAnnotation;
import com.datatorrent.api.annotation.Stateless;
+import com.datatorrent.common.util.BaseOperator;
/**
* Parse Apache log lines one line at a time.
@@ -58,116 +58,115 @@ import com.datatorrent.api.annotation.Stateless;
* @since 0.3.3
*/
@Stateless
-@OperatorAnnotation(partitionable=true)
+@OperatorAnnotation(partitionable = true)
public class ApacheLogParseOperator extends BaseOperator
{
/**
* This is the input port which receives apache log lines.
*/
- public final transient DefaultInputPort<String> data = new DefaultInputPort<String>()
- {
- @Override
- public void process(String s)
- {
- try {
- processTuple(s);
- } catch (ParseException ex) {
- // ignore
- }
- }
- };
+ public final transient DefaultInputPort<String> data = new DefaultInputPort<String>()
+ {
+ @Override
+ public void process(String s)
+ {
+ try {
+ processTuple(s);
+ } catch (ParseException ex) {
+ // ignore
+ }
+ }
+ };
- /**
- * Client IP address, output port.
- */
- public final transient DefaultOutputPort<String> outputIPAddress = new DefaultOutputPort<String>();
+ /**
+ * Client IP address, output port.
+ */
+ public final transient DefaultOutputPort<String> outputIPAddress = new DefaultOutputPort<String>();
- /**
- * Access url port, output port.
- */
- public final transient DefaultOutputPort<String> outputUrl = new DefaultOutputPort<String>();
+ /**
+ * Access url port, output port.
+ */
+ public final transient DefaultOutputPort<String> outputUrl = new DefaultOutputPort<String>();
- /**
- * Apache status log, output port.
- */
- public final transient DefaultOutputPort<String> outputStatusCode = new DefaultOutputPort<String>();
+ /**
+ * Apache status log, output port.
+ */
+ public final transient DefaultOutputPort<String> outputStatusCode = new DefaultOutputPort<String>();
- /**
- * Number of bytes served, output port.
- */
- public final transient DefaultOutputPort<Long> outputBytes = new DefaultOutputPort<Long>();
+ /**
+ * Number of bytes served, output port.
+ */
+ public final transient DefaultOutputPort<Long> outputBytes = new DefaultOutputPort<Long>();
- /**
- * Referer name, output port.
- */
- public final transient DefaultOutputPort<String> outputReferer = new DefaultOutputPort<String>();
+ /**
+ * Referer name, output port.
+ */
+ public final transient DefaultOutputPort<String> outputReferer = new DefaultOutputPort<String>();
- /**
- * IP Agent, output port.
- */
- public final transient DefaultOutputPort<String> outputAgent = new DefaultOutputPort<String>();
+ /**
+ * IP Agent, output port.
+ */
+ public final transient DefaultOutputPort<String> outputAgent = new DefaultOutputPort<String>();
- /**
- * Get apache log pattern regex.
- * @return regex string.
- */
- protected static String getAccessLogRegex()
- {
- String regex1 = "^([\\d\\.]+)"; // Client IP
- String regex2 = " (\\S+)"; // -
- String regex3 = " (\\S+)"; // -
- String regex4 = " \\[([\\w:/]+\\s[+\\-]\\d{4})\\]"; // Date
- String regex5 = " \"[A-Z]+ (.+?) HTTP/\\S+\""; // url
- String regex6 = " (\\d{3})"; // HTTP code
- String regex7 = " (\\d+)"; // Number of bytes
- String regex8 = " \"([^\"]+)\""; // Referer
- String regex9 = " \"([^\"]+)\""; // Agent
- String regex10 = ".*"; // ignore the rest
- return regex1 + regex2 + regex3 + regex4 + regex5 + regex6 + regex7
- + regex8 + regex9 + regex10;
- }
+ /**
+ * Get apache log pattern regex.
+ * @return regex string.
+ */
+ protected static String getAccessLogRegex()
+ {
+ String regex1 = "^([\\d\\.]+)"; // Client IP
+ String regex2 = " (\\S+)"; // -
+ String regex3 = " (\\S+)"; // -
+ String regex4 = " \\[([\\w:/]+\\s[+\\-]\\d{4})\\]"; // Date
+ String regex5 = " \"[A-Z]+ (.+?) HTTP/\\S+\""; // url
+ String regex6 = " (\\d{3})"; // HTTP code
+ String regex7 = " (\\d+)"; // Number of bytes
+ String regex8 = " \"([^\"]+)\""; // Referer
+ String regex9 = " \"([^\"]+)\""; // Agent
+ String regex10 = ".*"; // ignore the rest
+ return regex1 + regex2 + regex3 + regex4 + regex5 + regex6 + regex7
+ + regex8 + regex9 + regex10;
+ }
- /**
- * Parses Apache combined access log, and prints out the following <br>
- * 1. Requester IP <br>
- * 2. Date of Request <br>
- * 3. Requested Page Path
- *
- * @param line
- * : tuple to parsee
- * @throws ParseException
- */
- public void processTuple(String line) throws ParseException
- {
- // Apapche log attaributes on each line.
- String url;
- String httpStatusCode;
- long numOfBytes;
- String referer;
- String agent;
- String ipAddr;
+ /**
+ * Parses Apache combined access log, and prints out the following <br>
+ * 1. Requester IP <br>
+ * 2. Date of Request <br>
+ * 3. Requested Page Path
+ *
+ * @param line
+ * : tuple to parsee
+ * @throws ParseException
+ */
+ public void processTuple(String line) throws ParseException
+ {
+ // Apapche log attaributes on each line.
+ String url;
+ String httpStatusCode;
+ long numOfBytes;
+ String referer;
+ String agent;
+ String ipAddr;
- // Parse each log line.
- Pattern accessLogPattern = Pattern.compile(getAccessLogRegex(),
- Pattern.CASE_INSENSITIVE | Pattern.DOTALL);
- Matcher accessLogEntryMatcher;
- accessLogEntryMatcher = accessLogPattern.matcher(line);
+ // Parse each log line.
+ Pattern accessLogPattern = Pattern.compile(getAccessLogRegex(),
+ Pattern.CASE_INSENSITIVE | Pattern.DOTALL);
+ Matcher accessLogEntryMatcher;
+ accessLogEntryMatcher = accessLogPattern.matcher(line);
- if (accessLogEntryMatcher.matches()) {
- // System.out.println("MATCHED!");
- ipAddr = accessLogEntryMatcher.group(1);
- url = accessLogEntryMatcher.group(5);
- httpStatusCode = accessLogEntryMatcher.group(6);
- numOfBytes = Long.parseLong(accessLogEntryMatcher.group(7));
- referer = accessLogEntryMatcher.group(8);
- agent = accessLogEntryMatcher.group(9);
+ if (accessLogEntryMatcher.matches()) {
+ ipAddr = accessLogEntryMatcher.group(1);
+ url = accessLogEntryMatcher.group(5);
+ httpStatusCode = accessLogEntryMatcher.group(6);
+ numOfBytes = Long.parseLong(accessLogEntryMatcher.group(7));
+ referer = accessLogEntryMatcher.group(8);
+ agent = accessLogEntryMatcher.group(9);
- outputIPAddress.emit(ipAddr);
- outputUrl.emit(url);
- outputStatusCode.emit(httpStatusCode);
- outputBytes.emit(numOfBytes);
- outputReferer.emit(referer);
- outputAgent.emit(agent);
- }
- }
+ outputIPAddress.emit(ipAddr);
+ outputUrl.emit(url);
+ outputStatusCode.emit(httpStatusCode);
+ outputBytes.emit(numOfBytes);
+ outputReferer.emit(referer);
+ outputAgent.emit(agent);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/logs/ApacheVirtualLogParseOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/logs/ApacheVirtualLogParseOperator.java b/library/src/main/java/com/datatorrent/lib/logs/ApacheVirtualLogParseOperator.java
index 3861639..1ba555e 100644
--- a/library/src/main/java/com/datatorrent/lib/logs/ApacheVirtualLogParseOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/logs/ApacheVirtualLogParseOperator.java
@@ -18,11 +18,6 @@
*/
package com.datatorrent.lib.logs;
-import com.datatorrent.common.util.BaseOperator;
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.annotation.Stateless;
-
import java.io.IOException;
import java.text.ParseException;
import java.util.HashMap;
@@ -30,6 +25,11 @@ import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.annotation.Stateless;
+import com.datatorrent.common.util.BaseOperator;
+
/**
* Parse Apache log lines one line at a time.
* Regex (getAccessLogRegex) is used as a parser.
@@ -47,146 +47,154 @@ import java.util.regex.Pattern;
* @since 0.3.2
*/
@Stateless
-public class ApacheVirtualLogParseOperator extends BaseOperator {
-
- // default date format
- protected static final String dateFormat = "dd/MMM/yyyy:HH:mm:ss Z";
- /**
- *
- */
- public final transient DefaultInputPort<String> data = new DefaultInputPort<String>() {
- @Override
- public void process(String s) {
- try {
- processTuple(s);
- } catch (ParseException ex) {
- // ignore
- }
- }
- };
-
- /**
- * This output port emits the IPAddresses contained in log file lines.
- */
- public final transient DefaultOutputPort<String> outputIPAddress = new DefaultOutputPort<String>();
- /**
- * This output port emits URLs contained in log file lines.
- */
- public final transient DefaultOutputPort<String> outputUrl = new DefaultOutputPort<String>();
- /**
- * This output port emits status codes contained in log file lines.
- */
- public final transient DefaultOutputPort<String> outputStatusCode = new DefaultOutputPort<String>();
- /**
- * This output pot emits a Map for each log file line,
- * which contains all the information extracted from the log file line.
- */
- public final transient DefaultOutputPort<Map<String, Integer>> outputBytes = new DefaultOutputPort<Map<String, Integer>>();
- /**
- * This output port emits the referers contained in the log file lines.
- */
- public final transient DefaultOutputPort<String> outputReferer = new DefaultOutputPort<String>();
- /**
- * This output port emits the agents contained in the log file lines.
- */
- public final transient DefaultOutputPort<String> outputAgent = new DefaultOutputPort<String>();
- /**
- * This output port emits the servernames contained in the log file lines.
- */
- public final transient DefaultOutputPort<String> outputServerName = new DefaultOutputPort<String>();
- /**
- * This output port emits the servernames contained in the log file lines.
- */
- public final transient DefaultOutputPort<String> outputServerName1 = new DefaultOutputPort<String>();
- /**
- * This output port emits the status codes corresponding to each url in a log file line.
- */
- public final transient DefaultOutputPort<Map<String, String>> outUrlStatus = new DefaultOutputPort<Map<String, String>>();
- /**
- * This output port emits the status associated with each server in a log file line.
- */
- public final transient DefaultOutputPort<Map<String, String>> outServerStatus = new DefaultOutputPort<Map<String, String>>();
- /**
- * This output port emits client data usage contained in log file lines.
- */
- public final transient DefaultOutputPort<Integer> clientDataUsage = new DefaultOutputPort<Integer>();
- /**
- * This output port emits the view counts contained in log file lines.
- */
- public final transient DefaultOutputPort<Integer> viewCount = new DefaultOutputPort<Integer>();
-
- protected static String getAccessLogRegex() {
- String regex0 = "^([^\"]+)";
- String regex1 = " ([\\d\\.]+)"; // Client IP
- String regex2 = " (\\S+)"; // -
- String regex3 = " (\\S+)"; // -
- String regex4 = " \\[([\\w:/]+\\s[+\\-]\\d{4})\\]"; // Date
- String regex5 = " \"[A-Z]+ (.+?) HTTP/\\S+\""; // url
- String regex6 = " (\\d{3})"; // HTTP code
- String regex7 = " (\\d+)"; // Number of bytes
- String regex8 = " \"([^\"]+)\""; // Referer
- String regex9 = " \"([^\"]+)\""; // Agent
- String regex10 = ".*"; // ignore the rest
- return regex0 + regex1 + regex2 + regex3 + regex4 + regex5 + regex6 + regex7 + regex8 + regex9 + regex10;
+public class ApacheVirtualLogParseOperator extends BaseOperator
+{
+
+ // default date format
+ protected static final String dateFormat = "dd/MMM/yyyy:HH:mm:ss Z";
+ /**
+ *
+ */
+ public final transient DefaultInputPort<String> data = new DefaultInputPort<String>()
+ {
+ @Override
+ public void process(String s)
+ {
+ try {
+ processTuple(s);
+ } catch (ParseException ex) {
+ // ignore
+ }
}
-
- /**
- * Parses Apache combined access log, and prints out the following <br>1.
- * Requester IP <br>2. Date of Request <br>3. Requested Page Path
- *
- * @param line : tuple to parsee
- * @throws ParseException
- * @throws IOException
- */
- public void processTuple(String line) throws ParseException {
-
- // Apache log properties.
- String url;
- String httpStatusCode;
- long numOfBytes;
- String referer;
- String agent;
- String ipAddr;
- String serverName;
-
- // Parser log.
- Pattern accessLogPattern = Pattern.compile(getAccessLogRegex(), Pattern.CASE_INSENSITIVE
- | Pattern.DOTALL);
- Matcher accessLogEntryMatcher;
- accessLogEntryMatcher = accessLogPattern.matcher(line);
-
- if (accessLogEntryMatcher.matches()) {
-
- serverName = accessLogEntryMatcher.group(1);
- ipAddr = accessLogEntryMatcher.group(2);
- url = accessLogEntryMatcher.group(6);
- httpStatusCode = accessLogEntryMatcher.group(7);
- numOfBytes = Long.parseLong(accessLogEntryMatcher.group(8));
- referer = accessLogEntryMatcher.group(9);
- agent = accessLogEntryMatcher.group(10);
-
- outputIPAddress.emit(ipAddr);
- outputUrl.emit(url);
- outputStatusCode.emit(httpStatusCode);
- Map<String, Integer> ipdata = new HashMap<String, Integer>();
- ipdata.put(ipAddr, (int)numOfBytes);
- outputBytes.emit(ipdata);
- outputReferer.emit(referer);
- outputAgent.emit(agent);
- outputServerName.emit(serverName);
- outputServerName1.emit(serverName);
-
- HashMap<String, String> urlStatus = new HashMap<String, String>();
- urlStatus.put(url, httpStatusCode);
- outUrlStatus.emit(urlStatus);
-
- HashMap<String, String> serverStatus = new HashMap<String, String>();
- serverStatus.put(serverName, httpStatusCode);
- outServerStatus.emit(serverStatus);
-
- clientDataUsage.emit((int)numOfBytes);
- viewCount.emit(new Integer(1));
- }
+ };
+
+ /**
+ * This output port emits the IPAddresses contained in log file lines.
+ */
+ public final transient DefaultOutputPort<String> outputIPAddress = new DefaultOutputPort<String>();
+ /**
+ * This output port emits URLs contained in log file lines.
+ */
+ public final transient DefaultOutputPort<String> outputUrl = new DefaultOutputPort<String>();
+ /**
+ * This output port emits status codes contained in log file lines.
+ */
+ public final transient DefaultOutputPort<String> outputStatusCode = new DefaultOutputPort<String>();
+ /**
+ * This output pot emits a Map for each log file line,
+ * which contains all the information extracted from the log file line.
+ */
+ public final transient DefaultOutputPort<Map<String, Integer>> outputBytes =
+ new DefaultOutputPort<Map<String, Integer>>();
+ /**
+ * This output port emits the referers contained in the log file lines.
+ */
+ public final transient DefaultOutputPort<String> outputReferer = new DefaultOutputPort<String>();
+ /**
+ * This output port emits the agents contained in the log file lines.
+ */
+ public final transient DefaultOutputPort<String> outputAgent = new DefaultOutputPort<String>();
+ /**
+ * This output port emits the servernames contained in the log file lines.
+ */
+ public final transient DefaultOutputPort<String> outputServerName = new DefaultOutputPort<String>();
+ /**
+ * This output port emits the servernames contained in the log file lines.
+ */
+ public final transient DefaultOutputPort<String> outputServerName1 = new DefaultOutputPort<String>();
+ /**
+ * This output port emits the status codes corresponding to each url in a log file line.
+ */
+ public final transient DefaultOutputPort<Map<String, String>> outUrlStatus =
+ new DefaultOutputPort<Map<String, String>>();
+ /**
+ * This output port emits the status associated with each server in a log file line.
+ */
+ public final transient DefaultOutputPort<Map<String, String>> outServerStatus =
+ new DefaultOutputPort<Map<String, String>>();
+ /**
+ * This output port emits client data usage contained in log file lines.
+ */
+ public final transient DefaultOutputPort<Integer> clientDataUsage = new DefaultOutputPort<Integer>();
+ /**
+ * This output port emits the view counts contained in log file lines.
+ */
+ public final transient DefaultOutputPort<Integer> viewCount = new DefaultOutputPort<Integer>();
+
+ protected static String getAccessLogRegex()
+ {
+ String regex0 = "^([^\"]+)";
+ String regex1 = " ([\\d\\.]+)"; // Client IP
+ String regex2 = " (\\S+)"; // -
+ String regex3 = " (\\S+)"; // -
+ String regex4 = " \\[([\\w:/]+\\s[+\\-]\\d{4})\\]"; // Date
+ String regex5 = " \"[A-Z]+ (.+?) HTTP/\\S+\""; // url
+ String regex6 = " (\\d{3})"; // HTTP code
+ String regex7 = " (\\d+)"; // Number of bytes
+ String regex8 = " \"([^\"]+)\""; // Referer
+ String regex9 = " \"([^\"]+)\""; // Agent
+ String regex10 = ".*"; // ignore the rest
+ return regex0 + regex1 + regex2 + regex3 + regex4 + regex5 + regex6 + regex7 + regex8 + regex9 + regex10;
+ }
+
+ /**
+ * Parses Apache combined access log, and prints out the following <br>1.
+ * Requester IP <br>2. Date of Request <br>3. Requested Page Path
+ *
+ * @param line : tuple to parsee
+ * @throws ParseException
+ * @throws IOException
+ */
+ public void processTuple(String line) throws ParseException
+ {
+
+ // Apache log properties.
+ String url;
+ String httpStatusCode;
+ long numOfBytes;
+ String referer;
+ String agent;
+ String ipAddr;
+ String serverName;
+
+ // Parser log.
+ Pattern accessLogPattern = Pattern.compile(getAccessLogRegex(), Pattern.CASE_INSENSITIVE
+ | Pattern.DOTALL);
+ Matcher accessLogEntryMatcher;
+ accessLogEntryMatcher = accessLogPattern.matcher(line);
+
+ if (accessLogEntryMatcher.matches()) {
+
+ serverName = accessLogEntryMatcher.group(1);
+ ipAddr = accessLogEntryMatcher.group(2);
+ url = accessLogEntryMatcher.group(6);
+ httpStatusCode = accessLogEntryMatcher.group(7);
+ numOfBytes = Long.parseLong(accessLogEntryMatcher.group(8));
+ referer = accessLogEntryMatcher.group(9);
+ agent = accessLogEntryMatcher.group(10);
+
+ outputIPAddress.emit(ipAddr);
+ outputUrl.emit(url);
+ outputStatusCode.emit(httpStatusCode);
+ Map<String, Integer> ipdata = new HashMap<String, Integer>();
+ ipdata.put(ipAddr, (int)numOfBytes);
+ outputBytes.emit(ipdata);
+ outputReferer.emit(referer);
+ outputAgent.emit(agent);
+ outputServerName.emit(serverName);
+ outputServerName1.emit(serverName);
+
+ HashMap<String, String> urlStatus = new HashMap<String, String>();
+ urlStatus.put(url, httpStatusCode);
+ outUrlStatus.emit(urlStatus);
+
+ HashMap<String, String> serverStatus = new HashMap<String, String>();
+ serverStatus.put(serverName, httpStatusCode);
+ outServerStatus.emit(serverStatus);
+
+ clientDataUsage.emit((int)numOfBytes);
+ viewCount.emit(new Integer(1));
}
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/logs/DimensionAggregationUnifier.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/logs/DimensionAggregationUnifier.java b/library/src/main/java/com/datatorrent/lib/logs/DimensionAggregationUnifier.java
index 2c6ad20..6c56b11 100644
--- a/library/src/main/java/com/datatorrent/lib/logs/DimensionAggregationUnifier.java
+++ b/library/src/main/java/com/datatorrent/lib/logs/DimensionAggregationUnifier.java
@@ -43,8 +43,10 @@ public class DimensionAggregationUnifier implements Operator
private Map<String, Map<String, MutableDouble>> dataMap = new HashMap<String, Map<String, MutableDouble>>();
- public final transient DefaultOutputPort<Map<String, DimensionObject<String>>> output = new DefaultOutputPort<Map<String, DimensionObject<String>>>();
- public final transient DefaultInputPort<Map<String, DimensionObject<String>>> input = new DefaultInputPort<Map<String, DimensionObject<String>>>() {
+ public final transient DefaultOutputPort<Map<String, DimensionObject<String>>> output = new DefaultOutputPort<>();
+
+ public final transient DefaultInputPort<Map<String, DimensionObject<String>>> input = new DefaultInputPort<Map<String, DimensionObject<String>>>()
+ {
@Override
public void process(Map<String, DimensionObject<String>> tuple)
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/logs/DimensionObject.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/logs/DimensionObject.java b/library/src/main/java/com/datatorrent/lib/logs/DimensionObject.java
index f6c402a..cba895f 100644
--- a/library/src/main/java/com/datatorrent/lib/logs/DimensionObject.java
+++ b/library/src/main/java/com/datatorrent/lib/logs/DimensionObject.java
@@ -35,7 +35,8 @@ public class DimensionObject<T> implements Comparable<DimensionObject<T>>
private T val;
@SuppressWarnings("unused")
- private DimensionObject(){
+ private DimensionObject()
+ {
}
@@ -86,12 +87,14 @@ public class DimensionObject<T> implements Comparable<DimensionObject<T>>
@Override
public boolean equals(Object obj)
{
- if (obj == null)
+ if (obj == null) {
return false;
- if (!this.getClass().equals(obj.getClass()))
+ }
+ if (!this.getClass().equals(obj.getClass())) {
return false;
+ }
@SuppressWarnings("unchecked")
- DimensionObject<T> obj2 = (DimensionObject<T>) obj;
+ DimensionObject<T> obj2 = (DimensionObject<T>)obj;
return this.val.equals(obj2.val);
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/logs/FilteredLineToTokenArrayList.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/logs/FilteredLineToTokenArrayList.java b/library/src/main/java/com/datatorrent/lib/logs/FilteredLineToTokenArrayList.java
index 20dcf43..de0e1bd 100644
--- a/library/src/main/java/com/datatorrent/lib/logs/FilteredLineToTokenArrayList.java
+++ b/library/src/main/java/com/datatorrent/lib/logs/FilteredLineToTokenArrayList.java
@@ -18,9 +18,10 @@
*/
package com.datatorrent.lib.logs;
+import java.util.HashMap;
+
import com.datatorrent.api.annotation.OperatorAnnotation;
import com.datatorrent.api.annotation.Stateless;
-import java.util.HashMap;
/**
* <p>
@@ -53,7 +54,7 @@ import java.util.HashMap;
* @since 0.3.2
*/
@Stateless
-@OperatorAnnotation(partitionable=true)
+@OperatorAnnotation(partitionable = true)
public class FilteredLineToTokenArrayList extends LineToTokenArrayList
{
HashMap<String, Object> filterBy = new HashMap<String, Object>();
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/logs/FilteredLineToTokenHashMap.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/logs/FilteredLineToTokenHashMap.java b/library/src/main/java/com/datatorrent/lib/logs/FilteredLineToTokenHashMap.java
index f67b0b7..c79e884 100644
--- a/library/src/main/java/com/datatorrent/lib/logs/FilteredLineToTokenHashMap.java
+++ b/library/src/main/java/com/datatorrent/lib/logs/FilteredLineToTokenHashMap.java
@@ -18,9 +18,10 @@
*/
package com.datatorrent.lib.logs;
+import java.util.HashMap;
+
import com.datatorrent.api.annotation.OperatorAnnotation;
import com.datatorrent.api.annotation.Stateless;
-import java.util.HashMap;
/**
* <p>
@@ -55,35 +56,35 @@ import java.util.HashMap;
* @since 0.3.3
*/
@Stateless
-@OperatorAnnotation(partitionable=true)
+@OperatorAnnotation(partitionable = true)
public class FilteredLineToTokenHashMap extends LineToTokenHashMap
{
- HashMap<String, Object> filterBy = new HashMap<String, Object>(4);
+ HashMap<String, Object> filterBy = new HashMap<String, Object>(4);
- /**
- * setter function for filterBy
- *
- * @param list
- * list of keys for subtoken filters
- */
- public void setFilterBy(String[] list)
- {
- if (list != null) {
- for (String s : list) {
- filterBy.put(s, null);
- }
- }
- }
+ /**
+ * setter function for filterBy
+ *
+ * @param list
+ * list of keys for subtoken filters
+ */
+ public void setFilterBy(String[] list)
+ {
+ if (list != null) {
+ for (String s : list) {
+ filterBy.put(s, null);
+ }
+ }
+ }
- /**
- * If the key is in the filter, returns true
- *
- * @param subtok
- * @return true if super.validToken (!isEmpty()) and filter has they token
- */
- @Override
- public boolean validSubTokenKey(String subtok)
- {
- return super.validToken(subtok) && filterBy.containsKey(subtok);
- }
+ /**
+ * If the key is in the filter, returns true
+ *
+ * @param subtok
+ * @return true if super.validToken (!isEmpty()) and filter has they token
+ */
+ @Override
+ public boolean validSubTokenKey(String subtok)
+ {
+ return super.validToken(subtok) && filterBy.containsKey(subtok);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/logs/FilteredLineTokenizerKeyVal.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/logs/FilteredLineTokenizerKeyVal.java b/library/src/main/java/com/datatorrent/lib/logs/FilteredLineTokenizerKeyVal.java
index 37e66ff..a6acd4c 100644
--- a/library/src/main/java/com/datatorrent/lib/logs/FilteredLineTokenizerKeyVal.java
+++ b/library/src/main/java/com/datatorrent/lib/logs/FilteredLineTokenizerKeyVal.java
@@ -18,9 +18,10 @@
*/
package com.datatorrent.lib.logs;
+import java.util.HashMap;
+
import com.datatorrent.api.annotation.OperatorAnnotation;
import com.datatorrent.api.annotation.Stateless;
-import java.util.HashMap;
/**
* Splits lines into tokens, and tokens into sub-tokens and emits key,val pairs in a HashMap.
@@ -52,35 +53,35 @@ import java.util.HashMap;
* @since 0.3.3
*/
@Stateless
-@OperatorAnnotation(partitionable=true)
+@OperatorAnnotation(partitionable = true)
public class FilteredLineTokenizerKeyVal extends LineTokenizerKeyVal
{
- HashMap<String, Object> filterBy = new HashMap<String, Object>(4);
+ HashMap<String, Object> filterBy = new HashMap<String, Object>(4);
- /**
- * setter function for filterBy
- *
- * @param list
- * list of keys for subtoken filters
- */
- public void setFilterBy(String[] list)
- {
- if (list != null) {
- for (String s : list) {
- filterBy.put(s, null);
- }
- }
- }
+ /**
+ * setter function for filterBy
+ *
+ * @param list
+ * list of keys for subtoken filters
+ */
+ public void setFilterBy(String[] list)
+ {
+ if (list != null) {
+ for (String s : list) {
+ filterBy.put(s, null);
+ }
+ }
+ }
- /**
- * If the key is in the filter, returns true
- *
- * @param subtok
- * @return true if super.validToken (!isEmpty()) and filter has they token
- */
- @Override
- public boolean validSubTokenKey(String subtok)
- {
- return super.validToken(subtok) && filterBy.containsKey(subtok);
- }
+ /**
+ * If the key is in the filter, returns true
+ *
+ * @param subtok
+ * @return true if super.validToken (!isEmpty()) and filter has they token
+ */
+ @Override
+ public boolean validSubTokenKey(String subtok)
+ {
+ return super.validToken(subtok) && filterBy.containsKey(subtok);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/logs/LineToTokenArrayList.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/logs/LineToTokenArrayList.java b/library/src/main/java/com/datatorrent/lib/logs/LineToTokenArrayList.java
index ffc211d..182078b 100644
--- a/library/src/main/java/com/datatorrent/lib/logs/LineToTokenArrayList.java
+++ b/library/src/main/java/com/datatorrent/lib/logs/LineToTokenArrayList.java
@@ -56,7 +56,7 @@ import com.datatorrent.lib.util.UnifierArrayList;
* @since 0.3.2
*/
@Stateless
-@OperatorAnnotation(partitionable=true)
+@OperatorAnnotation(partitionable = true)
public class LineToTokenArrayList extends BaseLineTokenizer
{
protected transient ArrayList<String> tokentuple = null;
@@ -79,15 +79,16 @@ public class LineToTokenArrayList extends BaseLineTokenizer
};
/**
- * This output port emits a map from tokens to sub tokens.
- */
+ * This output port emits a map from tokens to sub tokens.
+ */
@OutputPortFieldAnnotation(optional = true)
- public final transient DefaultOutputPort<ArrayList<HashMap<String, ArrayList<String>>>> splittokens = new DefaultOutputPort<ArrayList<HashMap<String, ArrayList<String>>>>()
+ public final transient DefaultOutputPort<ArrayList<HashMap<String, ArrayList<String>>>> splittokens =
+ new DefaultOutputPort<ArrayList<HashMap<String, ArrayList<String>>>>()
{
@Override
public Unifier<ArrayList<HashMap<String, ArrayList<String>>>> getUnifier()
{
- return new UnifierArrayList<HashMap<String, ArrayList<String>>>();
+ return new UnifierArrayList<>();
}
};
@@ -152,8 +153,7 @@ public class LineToTokenArrayList extends BaseLineTokenizer
{
if (smap.isEmpty()) {
smap.put(subtok, vals);
- }
- else {
+ } else {
vals.add(subtok);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/logs/LineToTokenHashMap.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/logs/LineToTokenHashMap.java b/library/src/main/java/com/datatorrent/lib/logs/LineToTokenHashMap.java
index d0a67f3..0060c74 100644
--- a/library/src/main/java/com/datatorrent/lib/logs/LineToTokenHashMap.java
+++ b/library/src/main/java/com/datatorrent/lib/logs/LineToTokenHashMap.java
@@ -54,7 +54,7 @@ import com.datatorrent.lib.util.UnifierHashMap;
* @since 0.3.2
*/
@Stateless
-@OperatorAnnotation(partitionable=true)
+@OperatorAnnotation(partitionable = true)
public class LineToTokenHashMap extends BaseLineTokenizer
{
/**
@@ -103,8 +103,7 @@ public class LineToTokenHashMap extends BaseLineTokenizer
if (vals == null) {
tok = subtok;
vals = new ArrayList<String>();
- }
- else {
+ } else {
vals.add(subtok);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/logs/LineTokenizer.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/logs/LineTokenizer.java b/library/src/main/java/com/datatorrent/lib/logs/LineTokenizer.java
index 1ddd40c..814d132 100644
--- a/library/src/main/java/com/datatorrent/lib/logs/LineTokenizer.java
+++ b/library/src/main/java/com/datatorrent/lib/logs/LineTokenizer.java
@@ -19,7 +19,8 @@
package com.datatorrent.lib.logs;
import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.annotation.*;
+import com.datatorrent.api.annotation.OperatorAnnotation;
+import com.datatorrent.api.annotation.Stateless;
import com.datatorrent.lib.util.BaseLineTokenizer;
/**
@@ -47,25 +48,25 @@ import com.datatorrent.lib.util.BaseLineTokenizer;
* @since 0.3.3
*/
@Stateless
-@OperatorAnnotation(partitionable=true)
+@OperatorAnnotation(partitionable = true)
public class LineTokenizer extends BaseLineTokenizer
{
/**
* The is the output port that emits string tokens.
*/
- public final transient DefaultOutputPort<String> tokens = new DefaultOutputPort<String>();
+ public final transient DefaultOutputPort<String> tokens = new DefaultOutputPort<String>();
- /**
- * emits tokens on port "tokens" if tok is not empty
- *
- * @param tok
- */
- @Override
- public void processToken(String tok)
- {
- if (!tok.isEmpty()) {
- tokens.emit(tok);
- }
- }
+ /**
+ * emits tokens on port "tokens" if tok is not empty
+ *
+ * @param tok
+ */
+ @Override
+ public void processToken(String tok)
+ {
+ if (!tok.isEmpty()) {
+ tokens.emit(tok);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/logs/LineTokenizerKeyVal.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/logs/LineTokenizerKeyVal.java b/library/src/main/java/com/datatorrent/lib/logs/LineTokenizerKeyVal.java
index 2a568d1..0d89996 100644
--- a/library/src/main/java/com/datatorrent/lib/logs/LineTokenizerKeyVal.java
+++ b/library/src/main/java/com/datatorrent/lib/logs/LineTokenizerKeyVal.java
@@ -18,11 +18,13 @@
*/
package com.datatorrent.lib.logs;
+import java.util.HashMap;
+
import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.annotation.*;
+import com.datatorrent.api.annotation.OperatorAnnotation;
+import com.datatorrent.api.annotation.Stateless;
import com.datatorrent.lib.util.BaseLineTokenizer;
import com.datatorrent.lib.util.UnifierHashMap;
-import java.util.HashMap;
/**
* This operator splits lines into tokens, and tokens into sub-tokens.
@@ -44,7 +46,8 @@ import java.util.HashMap;
* <br>
* <b>Properties</b>:<br>
* <b>splitby</b>: The characters used to split the line. Default is ";\t "<br>
- * <b>splittokenby</b>: The characters used to split a token into key,val pair. Default is "", i.e. tokens are not split, and key is set to token, and val is null<br>
+ * <b>splittokenby</b>: The characters used to split a token into key,val pair. Default is "", i.e. tokens are not
+ * split, and key is set to token, and val is null<br>
* <br>
* </p>
* @displayName Line Tokenizer Key Value
@@ -54,7 +57,7 @@ import java.util.HashMap;
* @since 0.3.2
*/
@Stateless
-@OperatorAnnotation(partitionable=true)
+@OperatorAnnotation(partitionable = true)
public class LineTokenizerKeyVal extends BaseLineTokenizer
{
/**
@@ -129,8 +132,7 @@ public class LineTokenizerKeyVal extends BaseLineTokenizer
{
if (skey.isEmpty()) {
skey = subtok;
- }
- else if (sval.isEmpty()) {
+ } else if (sval.isEmpty()) {
sval = subtok;
}
}