You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2017/05/23 01:24:04 UTC
[06/13] apex-malhar git commit: Changed package path for files to be
included under malhar. Modifications to build files for project to build
under malhar.
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/main/java/com/datatorrent/flume/sink/Server.java
----------------------------------------------------------------------
diff --git a/flume/src/main/java/com/datatorrent/flume/sink/Server.java b/flume/src/main/java/com/datatorrent/flume/sink/Server.java
deleted file mode 100644
index 03c1ff0..0000000
--- a/flume/src/main/java/com/datatorrent/flume/sink/Server.java
+++ /dev/null
@@ -1,420 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.flume.sink;
-
-import java.net.InetSocketAddress;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.ServerSocketChannel;
-import java.nio.channels.SocketChannel;
-import java.util.ArrayList;
-import java.util.Arrays;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.datatorrent.flume.discovery.Discovery;
-import com.datatorrent.flume.discovery.Discovery.Service;
-import com.datatorrent.netlet.AbstractLengthPrependerClient;
-import com.datatorrent.netlet.AbstractServer;
-import com.datatorrent.netlet.EventLoop;
-import com.datatorrent.netlet.util.Slice;
-
-/**
- * <p>
- * Server class.</p>
- *
- * @since 0.9.2
- */
-public class Server extends AbstractServer
-{
- private final String id;
- private final Discovery<byte[]> discovery;
- private final long acceptedTolerance;
-
- public Server(String id, Discovery<byte[]> discovery, long acceptedTolerance)
- {
- this.id = id;
- this.discovery = discovery;
- this.acceptedTolerance = acceptedTolerance;
- }
-
- @Override
- public void handleException(Exception cce, EventLoop el)
- {
- logger.error("Server Error", cce);
- Request r = new Request(Command.SERVER_ERROR, null)
- {
- @Override
- public Slice getAddress()
- {
- throw new UnsupportedOperationException("Not supported yet.");
- }
-
- @Override
- public int getEventCount()
- {
- throw new UnsupportedOperationException("Not supported yet.");
- }
-
- @Override
- public int getIdleCount()
- {
- throw new UnsupportedOperationException("Not supported yet.");
- }
-
- };
- synchronized (requests) {
- requests.add(r);
- }
- }
-
- private final Service<byte[]> service = new Service<byte[]>()
- {
- @Override
- public String getHost()
- {
- return ((InetSocketAddress)getServerAddress()).getHostName();
- }
-
- @Override
- public int getPort()
- {
- return ((InetSocketAddress)getServerAddress()).getPort();
- }
-
- @Override
- public byte[] getPayload()
- {
- return null;
- }
-
- @Override
- public String getId()
- {
- return id;
- }
-
- @Override
- public String toString()
- {
- return "Server.Service{id=" + id + ", host=" + getHost() + ", port=" + getPort() + ", payload=" +
- Arrays.toString(getPayload()) + '}';
- }
-
- };
-
- @Override
- public void unregistered(final SelectionKey key)
- {
- discovery.unadvertise(service);
- super.unregistered(key);
- }
-
- @Override
- public void registered(final SelectionKey key)
- {
- super.registered(key);
- discovery.advertise(service);
- }
-
- public enum Command
- {
- ECHO((byte)0),
- SEEK((byte)1),
- COMMITTED((byte)2),
- CHECKPOINTED((byte)3),
- CONNECTED((byte)4),
- DISCONNECTED((byte)5),
- WINDOWED((byte)6),
- SERVER_ERROR((byte)7);
-
- Command(byte b)
- {
- this.ord = b;
- }
-
- public byte getOrdinal()
- {
- return ord;
- }
-
- public static Command getCommand(byte b)
- {
- Command c;
- switch (b) {
- case 0:
- c = ECHO;
- break;
-
- case 1:
- c = SEEK;
- break;
-
- case 2:
- c = COMMITTED;
- break;
-
- case 3:
- c = CHECKPOINTED;
- break;
-
- case 4:
- c = CONNECTED;
- break;
-
- case 5:
- c = DISCONNECTED;
- break;
-
- case 6:
- c = WINDOWED;
- break;
-
- case 7:
- c = SERVER_ERROR;
- break;
-
- default:
- throw new IllegalArgumentException(String.format("No Command defined for ordinal %b", b));
- }
-
- assert (b == c.ord);
- return c;
- }
-
- private final byte ord;
- }
-
- public final ArrayList<Request> requests = new ArrayList<Request>(4);
-
- @Override
- public ClientListener getClientConnection(SocketChannel sc, ServerSocketChannel ssc)
- {
- Client lClient = new Client();
- lClient.connected();
- return lClient;
- }
-
- public class Client extends AbstractLengthPrependerClient
- {
-
- @Override
- public void onMessage(byte[] buffer, int offset, int size)
- {
- if (size != Request.FIXED_SIZE) {
- logger.warn("Invalid Request Received: {} from {}", Arrays.copyOfRange(buffer, offset, offset + size),
- key.channel());
- return;
- }
-
- long requestTime = Server.readLong(buffer, offset + Request.TIME_OFFSET);
- if (System.currentTimeMillis() > (requestTime + acceptedTolerance)) {
- logger.warn("Expired Request Received: {} from {}", Arrays.copyOfRange(buffer, offset, offset + size),
- key.channel());
- return;
- }
-
- try {
- if (Command.getCommand(buffer[offset]) == Command.ECHO) {
- write(buffer, offset, size);
- return;
- }
- } catch (IllegalArgumentException ex) {
- logger.warn("Invalid Request Received: {} from {}!", Arrays.copyOfRange(buffer, offset, offset + size),
- key.channel(), ex);
- return;
- }
-
- Request r = Request.getRequest(buffer, offset, this);
- synchronized (requests) {
- requests.add(r);
- }
- }
-
- @Override
- public void disconnected()
- {
- synchronized (requests) {
- requests.add(Request.getRequest(
- new byte[] {Command.DISCONNECTED.getOrdinal(), 0, 0, 0, 0, 0, 0, 0, 0}, 0, this));
- }
- super.disconnected();
- }
-
- public boolean write(byte[] address, Slice event)
- {
- if (event.offset == 0 && event.length == event.buffer.length) {
- return write(address, event.buffer);
- }
-
- // a better method would be to replace the write implementation and allow it to natively support writing slices
- return write(address, event.toByteArray());
- }
-
- }
-
- public abstract static class Request
- {
- public static final int FIXED_SIZE = 17;
- public static final int TIME_OFFSET = 9;
- public final Command type;
- public final Client client;
-
- public Request(Command type, Client client)
- {
- this.type = type;
- this.client = client;
- }
-
- public abstract Slice getAddress();
-
- public abstract int getEventCount();
-
- public abstract int getIdleCount();
-
- @Override
- public String toString()
- {
- return "Request{" + "type=" + type + '}';
- }
-
- public static Request getRequest(final byte[] buffer, final int offset, Client client)
- {
- Command command = Command.getCommand(buffer[offset]);
- switch (command) {
- case WINDOWED:
- return new Request(Command.WINDOWED, client)
- {
- final int eventCount;
- final int idleCount;
-
- {
- eventCount = Server.readInt(buffer, offset + 1);
- idleCount = Server.readInt(buffer, offset + 5);
- }
-
- @Override
- public Slice getAddress()
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public int getEventCount()
- {
- return eventCount;
- }
-
- @Override
- public int getIdleCount()
- {
- return idleCount;
- }
-
- @Override
- public String toString()
- {
- return "Request{" + "type=" + type + ", eventCount=" + eventCount + ", idleCount=" + idleCount + '}';
- }
-
- };
-
- default:
- return new Request(command, client)
- {
- final Slice address;
-
- {
- address = new Slice(buffer, offset + 1, 8);
- }
-
- @Override
- public Slice getAddress()
- {
- return address;
- }
-
- @Override
- public int getEventCount()
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public int getIdleCount()
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public String toString()
- {
- return "Request{" + "type=" + type + ", address=" + address + '}';
- }
-
- };
-
- }
-
- }
-
- }
-
- public static int readInt(byte[] buffer, int offset)
- {
- return buffer[offset++] & 0xff
- | (buffer[offset++] & 0xff) << 8
- | (buffer[offset++] & 0xff) << 16
- | (buffer[offset++] & 0xff) << 24;
- }
-
- public static void writeInt(byte[] buffer, int offset, int i)
- {
- buffer[offset++] = (byte)i;
- buffer[offset++] = (byte)(i >>> 8);
- buffer[offset++] = (byte)(i >>> 16);
- buffer[offset++] = (byte)(i >>> 24);
- }
-
- public static long readLong(byte[] buffer, int offset)
- {
- return (long)buffer[offset++] & 0xff
- | (long)(buffer[offset++] & 0xff) << 8
- | (long)(buffer[offset++] & 0xff) << 16
- | (long)(buffer[offset++] & 0xff) << 24
- | (long)(buffer[offset++] & 0xff) << 32
- | (long)(buffer[offset++] & 0xff) << 40
- | (long)(buffer[offset++] & 0xff) << 48
- | (long)(buffer[offset++] & 0xff) << 56;
- }
-
- public static void writeLong(byte[] buffer, int offset, long l)
- {
- buffer[offset++] = (byte)l;
- buffer[offset++] = (byte)(l >>> 8);
- buffer[offset++] = (byte)(l >>> 16);
- buffer[offset++] = (byte)(l >>> 24);
- buffer[offset++] = (byte)(l >>> 32);
- buffer[offset++] = (byte)(l >>> 40);
- buffer[offset++] = (byte)(l >>> 48);
- buffer[offset++] = (byte)(l >>> 56);
- }
-
- private static final Logger logger = LoggerFactory.getLogger(Server.class);
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/main/java/com/datatorrent/flume/source/HdfsTestSource.java
----------------------------------------------------------------------
diff --git a/flume/src/main/java/com/datatorrent/flume/source/HdfsTestSource.java b/flume/src/main/java/com/datatorrent/flume/source/HdfsTestSource.java
deleted file mode 100644
index 72e1913..0000000
--- a/flume/src/main/java/com/datatorrent/flume/source/HdfsTestSource.java
+++ /dev/null
@@ -1,224 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.flume.source;
-
-import java.io.BufferedReader;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.text.SimpleDateFormat;
-import java.util.Calendar;
-import java.util.Date;
-import java.util.List;
-import java.util.Timer;
-import java.util.TimerTask;
-
-import javax.annotation.Nonnull;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.flume.EventDrivenSource;
-import org.apache.flume.channel.ChannelProcessor;
-import org.apache.flume.conf.Configurable;
-import org.apache.flume.event.EventBuilder;
-import org.apache.flume.source.AbstractSource;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocatedFileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
-
-import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
-import com.google.common.collect.Lists;
-
-/**
- * <p>TestSource class.</p>
- *
- * @since 0.9.4
- */
-public class HdfsTestSource extends AbstractSource implements EventDrivenSource, Configurable
-{
- public static final String SOURCE_DIR = "sourceDir";
- public static final String RATE = "rate";
- public static final String INIT_DATE = "initDate";
-
- static byte FIELD_SEPARATOR = 2;
- public Timer emitTimer;
- @Nonnull
- String directory;
- Path directoryPath;
- int rate;
- String initDate;
- long initTime;
- List<String> dataFiles;
- long oneDayBack;
-
- private transient BufferedReader br = null;
- protected transient FileSystem fs;
- private transient Configuration configuration;
-
- private transient int currentFile = 0;
- private transient boolean finished;
- private List<Event> events;
-
- public HdfsTestSource()
- {
- super();
- this.rate = 2500;
- dataFiles = Lists.newArrayList();
- Calendar calendar = Calendar.getInstance();
- calendar.add(Calendar.DATE, -1);
- oneDayBack = calendar.getTimeInMillis();
- configuration = new Configuration();
- events = Lists.newArrayList();
- }
-
- @Override
- public void configure(Context context)
- {
- directory = context.getString(SOURCE_DIR);
- rate = context.getInteger(RATE, rate);
- initDate = context.getString(INIT_DATE);
-
- Preconditions.checkArgument(!Strings.isNullOrEmpty(directory));
- directoryPath = new Path(directory);
-
- String[] parts = initDate.split("-");
- Preconditions.checkArgument(parts.length == 3);
- Calendar calendar = Calendar.getInstance();
- calendar.set(Integer.parseInt(parts[0]), Integer.parseInt(parts[1]) - 1, Integer.parseInt(parts[2]), 0, 0, 0);
- initTime = calendar.getTimeInMillis();
-
- try {
- List<String> files = findFiles();
- for (String file : files) {
- dataFiles.add(file);
- }
- if (logger.isDebugEnabled()) {
- SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
- logger.debug("settings {} {} {} {} {}", directory, rate, dateFormat.format(oneDayBack),
- dateFormat.format(new Date(initTime)), currentFile);
- for (String file : dataFiles) {
- logger.debug("settings add file {}", file);
- }
- }
-
- fs = FileSystem.newInstance(new Path(directory).toUri(), configuration);
- Path filePath = new Path(dataFiles.get(currentFile));
- br = new BufferedReader(new InputStreamReader(new GzipCompressorInputStream(fs.open(filePath))));
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
-
- finished = true;
-
- }
-
- private List<String> findFiles() throws IOException
- {
- List<String> files = Lists.newArrayList();
- Path directoryPath = new Path(directory);
- FileSystem lfs = FileSystem.newInstance(directoryPath.toUri(), configuration);
- try {
- logger.debug("checking for new files in {}", directoryPath);
- RemoteIterator<LocatedFileStatus> statuses = lfs.listFiles(directoryPath, true);
- for (; statuses.hasNext(); ) {
- FileStatus status = statuses.next();
- Path path = status.getPath();
- String filePathStr = path.toString();
- if (!filePathStr.endsWith(".gz")) {
- continue;
- }
- logger.debug("new file {}", filePathStr);
- files.add(path.toString());
- }
- } catch (FileNotFoundException e) {
- logger.warn("Failed to list directory {}", directoryPath, e);
- throw new RuntimeException(e);
- } finally {
- lfs.close();
- }
- return files;
- }
-
- @Override
- public void start()
- {
- super.start();
- emitTimer = new Timer();
-
- final ChannelProcessor channelProcessor = getChannelProcessor();
- emitTimer.scheduleAtFixedRate(new TimerTask()
- {
- @Override
- public void run()
- {
- int lineCount = 0;
- events.clear();
- try {
- while (lineCount < rate && !finished) {
- String line = br.readLine();
-
- if (line == null) {
- logger.debug("completed file {}", currentFile);
- br.close();
- currentFile++;
- if (currentFile == dataFiles.size()) {
- logger.info("finished all files");
- finished = true;
- break;
- }
- Path filePath = new Path(dataFiles.get(currentFile));
- br = new BufferedReader(new InputStreamReader(new GzipCompressorInputStream(fs.open(filePath))));
- logger.info("opening file {}. {}", currentFile, filePath);
- continue;
- }
- lineCount++;
- Event flumeEvent = EventBuilder.withBody(line.getBytes());
- events.add(flumeEvent);
- }
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- if (events.size() > 0) {
- channelProcessor.processEventBatch(events);
- }
- if (finished) {
- emitTimer.cancel();
- }
- }
-
- }, 0, 1000);
- }
-
- @Override
- public void stop()
- {
- emitTimer.cancel();
- super.stop();
- }
-
- private static final Logger logger = LoggerFactory.getLogger(HdfsTestSource.class);
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/main/java/com/datatorrent/flume/source/TestSource.java
----------------------------------------------------------------------
diff --git a/flume/src/main/java/com/datatorrent/flume/source/TestSource.java b/flume/src/main/java/com/datatorrent/flume/source/TestSource.java
deleted file mode 100644
index 5773de3..0000000
--- a/flume/src/main/java/com/datatorrent/flume/source/TestSource.java
+++ /dev/null
@@ -1,250 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.flume.source;
-
-import java.io.BufferedReader;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.text.SimpleDateFormat;
-import java.util.Calendar;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Random;
-import java.util.Set;
-import java.util.Timer;
-import java.util.TimerTask;
-
-import javax.annotation.Nonnull;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.flume.EventDrivenSource;
-import org.apache.flume.channel.ChannelProcessor;
-import org.apache.flume.conf.Configurable;
-import org.apache.flume.event.EventBuilder;
-import org.apache.flume.source.AbstractSource;
-
-import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
-/**
- * <p>TestSource class.</p>
- *
- * @since 0.9.4
- */
-public class TestSource extends AbstractSource implements EventDrivenSource, Configurable
-{
- public static final String SOURCE_FILE = "sourceFile";
- public static final String LINE_NUMBER = "lineNumber";
- public static final String RATE = "rate";
- public static final String PERCENT_PAST_EVENTS = "percentPastEvents";
- static byte FIELD_SEPARATOR = 1;
- static int DEF_PERCENT_PAST_EVENTS = 5;
- public Timer emitTimer;
- @Nonnull
- String filePath;
- int rate;
- int numberOfPastEvents;
- transient List<Row> cache;
- private transient int startIndex;
- private transient Random random;
- private SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
- private SimpleDateFormat timeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
-
- public TestSource()
- {
- super();
- this.rate = 2500;
- this.numberOfPastEvents = DEF_PERCENT_PAST_EVENTS * 25;
- this.random = new Random();
-
- }
-
- @Override
- public void configure(Context context)
- {
- filePath = context.getString(SOURCE_FILE);
- rate = context.getInteger(RATE, rate);
- int percentPastEvents = context.getInteger(PERCENT_PAST_EVENTS, DEF_PERCENT_PAST_EVENTS);
- Preconditions.checkArgument(!Strings.isNullOrEmpty(filePath));
- try {
- BufferedReader lineReader = new BufferedReader(new InputStreamReader(new FileInputStream(filePath)));
- try {
- buildCache(lineReader);
- } finally {
- lineReader.close();
- }
- } catch (FileNotFoundException e) {
- throw new RuntimeException(e);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
-
- if (DEF_PERCENT_PAST_EVENTS != percentPastEvents) {
- numberOfPastEvents = (int)(percentPastEvents / 100.0 * cache.size());
- }
- }
-
- @Override
- public void start()
- {
- super.start();
- emitTimer = new Timer();
-
- final ChannelProcessor channel = getChannelProcessor();
- final int cacheSize = cache.size();
- emitTimer.scheduleAtFixedRate(new TimerTask()
- {
- @Override
- public void run()
- {
- int lastIndex = startIndex + rate;
- if (lastIndex > cacheSize) {
- lastIndex -= cacheSize;
- processBatch(channel, cache.subList(startIndex, cacheSize));
- startIndex = 0;
- while (lastIndex > cacheSize) {
- processBatch(channel, cache);
- lastIndex -= cacheSize;
- }
- processBatch(channel, cache.subList(0, lastIndex));
- } else {
- processBatch(channel, cache.subList(startIndex, lastIndex));
- }
- startIndex = lastIndex;
- }
-
- }, 0, 1000);
- }
-
- private void processBatch(ChannelProcessor channelProcessor, List<Row> rows)
- {
- if (rows.isEmpty()) {
- return;
- }
-
- int noise = random.nextInt(numberOfPastEvents + 1);
- Set<Integer> pastIndices = Sets.newHashSet();
- for (int i = 0; i < noise; i++) {
- pastIndices.add(random.nextInt(rows.size()));
- }
-
- Calendar calendar = Calendar.getInstance();
- long high = calendar.getTimeInMillis();
- calendar.add(Calendar.DATE, -2);
- long low = calendar.getTimeInMillis();
-
-
-
- List<Event> events = Lists.newArrayList();
- for (int i = 0; i < rows.size(); i++) {
- Row eventRow = rows.get(i);
- if (pastIndices.contains(i)) {
- long pastTime = (long)((Math.random() * (high - low)) + low);
- byte[] pastDateField = dateFormat.format(pastTime).getBytes();
- byte[] pastTimeField = timeFormat.format(pastTime).getBytes();
-
- System.arraycopy(pastDateField, 0, eventRow.bytes, eventRow.dateFieldStart, pastDateField.length);
- System.arraycopy(pastTimeField, 0, eventRow.bytes, eventRow.timeFieldStart, pastTimeField.length);
- } else {
- calendar.setTimeInMillis(System.currentTimeMillis());
- byte[] currentDateField = dateFormat.format(calendar.getTime()).getBytes();
- byte[] currentTimeField = timeFormat.format(calendar.getTime()).getBytes();
-
- System.arraycopy(currentDateField, 0, eventRow.bytes, eventRow.dateFieldStart, currentDateField.length);
- System.arraycopy(currentTimeField, 0, eventRow.bytes, eventRow.timeFieldStart, currentTimeField.length);
- }
-
- HashMap<String, String> headers = new HashMap<String, String>(2);
- headers.put(SOURCE_FILE, filePath);
- headers.put(LINE_NUMBER, String.valueOf(startIndex + i));
- events.add(EventBuilder.withBody(eventRow.bytes, headers));
- }
- channelProcessor.processEventBatch(events);
- }
-
- @Override
- public void stop()
- {
- emitTimer.cancel();
- super.stop();
- }
-
- private void buildCache(BufferedReader lineReader) throws IOException
- {
- cache = Lists.newArrayListWithCapacity(rate);
-
- String line;
- while ((line = lineReader.readLine()) != null) {
- byte[] row = line.getBytes();
- Row eventRow = new Row(row);
- final int rowsize = row.length;
-
- /* guid */
- int sliceLengh = -1;
- while (++sliceLengh < rowsize) {
- if (row[sliceLengh] == FIELD_SEPARATOR) {
- break;
- }
- }
- int recordStart = sliceLengh + 1;
- int pointer = sliceLengh + 1;
- while (pointer < rowsize) {
- if (row[pointer++] == FIELD_SEPARATOR) {
- eventRow.dateFieldStart = recordStart;
- break;
- }
- }
-
- /* lets parse the date */
- int dateStart = pointer;
- while (pointer < rowsize) {
- if (row[pointer++] == FIELD_SEPARATOR) {
- eventRow.timeFieldStart = dateStart;
- break;
- }
- }
-
- cache.add(eventRow);
- }
- }
-
- private static class Row
- {
- final byte[] bytes;
- int dateFieldStart;
- int timeFieldStart;
-// boolean past;
-
- Row(byte[] bytes)
- {
- this.bytes = bytes;
- }
-
- }
-
- private static final Logger logger = LoggerFactory.getLogger(TestSource.class);
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/main/java/com/datatorrent/flume/storage/DebugWrapper.java
----------------------------------------------------------------------
diff --git a/flume/src/main/java/com/datatorrent/flume/storage/DebugWrapper.java b/flume/src/main/java/com/datatorrent/flume/storage/DebugWrapper.java
deleted file mode 100644
index da94154..0000000
--- a/flume/src/main/java/com/datatorrent/flume/storage/DebugWrapper.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.flume.storage;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.flume.Context;
-import org.apache.flume.conf.Configurable;
-
-import com.datatorrent.api.Component;
-import com.datatorrent.netlet.util.Slice;
-
-/**
- * <p>DebugWrapper class.</p>
- *
- * @since 0.9.4
- */
-public class DebugWrapper implements Storage, Configurable, Component<com.datatorrent.api.Context>
-{
- HDFSStorage storage = new HDFSStorage();
-
- @Override
- public byte[] store(Slice bytes)
- {
- byte[] ret = null;
-
- try {
- ret = storage.store(bytes);
- } finally {
- logger.debug("storage.store(new byte[]{{}});", bytes);
- }
-
- return ret;
- }
-
- @Override
- public byte[] retrieve(byte[] identifier)
- {
- byte[] ret = null;
-
- try {
- ret = storage.retrieve(identifier);
- } finally {
- logger.debug("storage.retrieve(new byte[]{{}});", identifier);
- }
-
- return ret;
- }
-
- @Override
- public byte[] retrieveNext()
- {
- byte[] ret = null;
- try {
- ret = storage.retrieveNext();
- } finally {
- logger.debug("storage.retrieveNext();");
- }
-
- return ret;
- }
-
- @Override
- public void clean(byte[] identifier)
- {
- try {
- storage.clean(identifier);
- } finally {
- logger.debug("storage.clean(new byte[]{{}});", identifier);
- }
- }
-
- @Override
- public void flush()
- {
- try {
- storage.flush();
- } finally {
- logger.debug("storage.flush();");
- }
- }
-
- @Override
- public void configure(Context cntxt)
- {
- try {
- storage.configure(cntxt);
- } finally {
- logger.debug("storage.configure({});", cntxt);
- }
- }
-
- @Override
- public void setup(com.datatorrent.api.Context t1)
- {
- try {
- storage.setup(t1);
- } finally {
- logger.debug("storage.setup({});", t1);
- }
-
- }
-
- @Override
- public void teardown()
- {
- try {
- storage.teardown();
- } finally {
- logger.debug("storage.teardown();");
- }
- }
-
- private static final Logger logger = LoggerFactory.getLogger(DebugWrapper.class);
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/main/java/com/datatorrent/flume/storage/ErrorMaskingEventCodec.java
----------------------------------------------------------------------
diff --git a/flume/src/main/java/com/datatorrent/flume/storage/ErrorMaskingEventCodec.java b/flume/src/main/java/com/datatorrent/flume/storage/ErrorMaskingEventCodec.java
deleted file mode 100644
index 76f663c..0000000
--- a/flume/src/main/java/com/datatorrent/flume/storage/ErrorMaskingEventCodec.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.flume.storage;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.flume.Event;
-
-import com.datatorrent.netlet.util.Slice;
-
-/**
- * <p>ErrorMaskingEventCodec class.</p>
- *
- * @since 1.0.4
- */
-public class ErrorMaskingEventCodec extends EventCodec
-{
-
- @Override
- public Object fromByteArray(Slice fragment)
- {
- try {
- return super.fromByteArray(fragment);
- } catch (RuntimeException re) {
- logger.warn("Cannot deserialize event {}", fragment, re);
- }
-
- return null;
- }
-
- @Override
- public Slice toByteArray(Event event)
- {
- try {
- return super.toByteArray(event);
- } catch (RuntimeException re) {
- logger.warn("Cannot serialize event {}", event, re);
- }
-
- return null;
- }
-
-
- private static final Logger logger = LoggerFactory.getLogger(ErrorMaskingEventCodec.class);
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/main/java/com/datatorrent/flume/storage/EventCodec.java
----------------------------------------------------------------------
diff --git a/flume/src/main/java/com/datatorrent/flume/storage/EventCodec.java b/flume/src/main/java/com/datatorrent/flume/storage/EventCodec.java
deleted file mode 100644
index 0ece548..0000000
--- a/flume/src/main/java/com/datatorrent/flume/storage/EventCodec.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.flume.storage;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.flume.Event;
-import org.apache.flume.event.EventBuilder;
-
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
-
-import com.datatorrent.api.StreamCodec;
-import com.datatorrent.netlet.util.Slice;
-
-/**
- * <p>EventCodec class.</p>
- *
- * @since 0.9.4
- */
-public class EventCodec implements StreamCodec<Event>
-{
- private final transient Kryo kryo;
-
- public EventCodec()
- {
- this.kryo = new Kryo();
- this.kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
- }
-
- @Override
- public Object fromByteArray(Slice fragment)
- {
- ByteArrayInputStream is = new ByteArrayInputStream(fragment.buffer, fragment.offset, fragment.length);
- Input input = new Input(is);
-
- @SuppressWarnings("unchecked")
- HashMap<String, String> headers = kryo.readObjectOrNull(input, HashMap.class);
- byte[] body = kryo.readObjectOrNull(input, byte[].class);
- return EventBuilder.withBody(body, headers);
- }
-
- @Override
- public Slice toByteArray(Event event)
- {
- ByteArrayOutputStream os = new ByteArrayOutputStream();
- Output output = new Output(os);
-
- Map<String, String> headers = event.getHeaders();
- if (headers != null && headers.getClass() != HashMap.class) {
- HashMap<String, String> tmp = new HashMap<String, String>(headers.size());
- tmp.putAll(headers);
- headers = tmp;
- }
- kryo.writeObjectOrNull(output, headers, HashMap.class);
- kryo.writeObjectOrNull(output, event.getBody(), byte[].class);
- output.flush();
- final byte[] bytes = os.toByteArray();
- return new Slice(bytes, 0, bytes.length);
- }
-
- @Override
- public int getPartition(Event o)
- {
- return o.hashCode();
- }
-
- private static final Logger logger = LoggerFactory.getLogger(EventCodec.class);
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/main/java/com/datatorrent/flume/storage/HDFSStorage.java
----------------------------------------------------------------------
diff --git a/flume/src/main/java/com/datatorrent/flume/storage/HDFSStorage.java b/flume/src/main/java/com/datatorrent/flume/storage/HDFSStorage.java
deleted file mode 100644
index 4dcddcd..0000000
--- a/flume/src/main/java/com/datatorrent/flume/storage/HDFSStorage.java
+++ /dev/null
@@ -1,947 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.flume.storage;
-
-import java.io.DataInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-import javax.validation.constraints.NotNull;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.flume.Context;
-import org.apache.flume.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import com.google.common.primitives.Ints;
-import com.google.common.primitives.Longs;
-
-import com.datatorrent.api.Component;
-import com.datatorrent.common.util.NameableThreadFactory;
-import com.datatorrent.flume.sink.Server;
-import com.datatorrent.netlet.util.Slice;
-
-/**
- * HDFSStorage is developed to store and retrieve the data from HDFS
- * <p />
- * The properties that can be set on HDFSStorage are: <br />
- * baseDir - The base directory where the data is going to be stored <br />
- * restore - This is used to restore the application from previous failure <br />
- * blockSize - The maximum size of the each file to created. <br />
- *
- * @since 0.9.3
- */
-public class HDFSStorage implements Storage, Configurable, Component<com.datatorrent.api.Context>
-{
- public static final int DEFAULT_BLOCK_SIZE = 64 * 1024 * 1024;
- public static final String BASE_DIR_KEY = "baseDir";
- public static final String RESTORE_KEY = "restore";
- public static final String BLOCKSIZE = "blockSize";
- public static final String BLOCK_SIZE_MULTIPLE = "blockSizeMultiple";
- public static final String NUMBER_RETRY = "retryCount";
-
- private static final String OFFSET_SUFFIX = "-offsetFile";
- private static final String BOOK_KEEPING_FILE_OFFSET = "-bookKeepingOffsetFile";
- private static final String FLUSHED_IDENTITY_FILE = "flushedCounter";
- private static final String CLEAN_OFFSET_FILE = "cleanoffsetFile";
- private static final String FLUSHED_IDENTITY_FILE_TEMP = "flushedCounter.tmp";
- private static final String CLEAN_OFFSET_FILE_TEMP = "cleanoffsetFile.tmp";
- private static final int IDENTIFIER_SIZE = 8;
- private static final int DATA_LENGTH_BYTE_SIZE = 4;
-
- /**
- * Number of times the storage will try to get the filesystem
- */
- private int retryCount = 3;
- /**
- * The multiple of block size
- */
- private int blockSizeMultiple = 1;
- /**
- * Identifier for this storage.
- */
- @NotNull
- private String id;
- /**
- * The baseDir where the storage facility is going to create files.
- */
- @NotNull
- private String baseDir;
- /**
- * The block size to be used to create the storage files
- */
- private long blockSize;
- /**
- *
- */
- private boolean restore;
- /**
- * This identifies the current file number
- */
- private long currentWrittenFile;
- /**
- * This identifies the file number that has been flushed
- */
- private long flushedFileCounter;
- /**
- * The file that stores the fileCounter information
- */
- // private Path fileCounterFile;
- /**
- * The file that stores the flushed fileCounter information
- */
- private Path flushedCounterFile;
- private Path flushedCounterFileTemp;
- /**
- * This identifies the last cleaned file number
- */
- private long cleanedFileCounter;
- /**
- * The file that stores the clean file counter information
- */
- // private Path cleanFileCounterFile;
- /**
- * The file that stores the clean file offset information
- */
- private Path cleanFileOffsetFile;
- private Path cleanFileOffsetFileTemp;
- private FileSystem fs;
- private FSDataOutputStream dataStream;
- ArrayList<DataBlock> files2Commit = new ArrayList<DataBlock>();
- /**
- * The offset in the current opened file
- */
- private long fileWriteOffset;
- private FSDataInputStream readStream;
- private long retrievalOffset;
- private long retrievalFile;
- private int offset;
- private long flushedLong;
- private long flushedFileWriteOffset;
- private long bookKeepingFileOffset;
- private byte[] cleanedOffset = new byte[8];
- private long skipOffset;
- private long skipFile;
- private transient Path basePath;
- private ExecutorService storageExecutor;
- private byte[] currentData;
- private FSDataInputStream nextReadStream;
- private long nextFlushedLong;
- private long nextRetrievalFile;
- private byte[] nextRetrievalData;
-
- public HDFSStorage()
- {
- this.restore = true;
- }
-
- /**
- * This stores the Identifier information identified in the last store function call
- *
- * @param ctx
- */
- @Override
- public void configure(Context ctx)
- {
- String tempId = ctx.getString(ID);
- if (tempId == null) {
- if (id == null) {
- throw new IllegalArgumentException("id can't be null.");
- }
- } else {
- id = tempId;
- }
-
- String tempBaseDir = ctx.getString(BASE_DIR_KEY);
- if (tempBaseDir != null) {
- baseDir = tempBaseDir;
- }
-
- restore = ctx.getBoolean(RESTORE_KEY, restore);
- Long tempBlockSize = ctx.getLong(BLOCKSIZE);
- if (tempBlockSize != null) {
- blockSize = tempBlockSize;
- }
- blockSizeMultiple = ctx.getInteger(BLOCK_SIZE_MULTIPLE, blockSizeMultiple);
- retryCount = ctx.getInteger(NUMBER_RETRY,retryCount);
- }
-
- /**
- * This function reads the file at a location and return the bytes stored in the file "
- *
- * @param path - the location of the file
- * @return
- * @throws IOException
- */
- byte[] readData(Path path) throws IOException
- {
- DataInputStream is = new DataInputStream(fs.open(path));
- byte[] bytes = new byte[is.available()];
- is.readFully(bytes);
- is.close();
- return bytes;
- }
-
- /**
- * This function writes the bytes to a file specified by the path
- *
- * @param path the file location
- * @param data the data to be written to the file
- * @return
- * @throws IOException
- */
- private FSDataOutputStream writeData(Path path, byte[] data) throws IOException
- {
- FSDataOutputStream fsOutputStream;
- if (fs.getScheme().equals("file")) {
- // local FS does not support hflush and does not flush native stream
- fsOutputStream = new FSDataOutputStream(
- new FileOutputStream(Path.getPathWithoutSchemeAndAuthority(path).toString()), null);
- } else {
- fsOutputStream = fs.create(path);
- }
- fsOutputStream.write(data);
- return fsOutputStream;
- }
-
- private long calculateOffset(long fileOffset, long fileCounter)
- {
- return ((fileCounter << 32) | (fileOffset & 0xffffffffL));
- }
-
- @Override
- public byte[] store(Slice slice)
- {
- // logger.debug("store message ");
- int bytesToWrite = slice.length + DATA_LENGTH_BYTE_SIZE;
- if (currentWrittenFile < skipFile) {
- fileWriteOffset += bytesToWrite;
- if (fileWriteOffset >= bookKeepingFileOffset) {
- files2Commit.add(new DataBlock(null, bookKeepingFileOffset,
- new Path(basePath, currentWrittenFile + OFFSET_SUFFIX), currentWrittenFile));
- currentWrittenFile++;
- if (fileWriteOffset > bookKeepingFileOffset) {
- fileWriteOffset = bytesToWrite;
- } else {
- fileWriteOffset = 0;
- }
- try {
- bookKeepingFileOffset = getFlushedFileWriteOffset(
- new Path(basePath, currentWrittenFile + BOOK_KEEPING_FILE_OFFSET));
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
- return null;
- }
-
- if (flushedFileCounter == currentWrittenFile && dataStream == null) {
- currentWrittenFile++;
- fileWriteOffset = 0;
- }
-
- if (flushedFileCounter == skipFile && skipFile != -1) {
- skipFile++;
- }
-
- if (fileWriteOffset + bytesToWrite < blockSize) {
- try {
- /* write length and the actual data to the file */
- if (fileWriteOffset == 0) {
- // writeData(flushedCounterFile, String.valueOf(currentWrittenFile).getBytes()).close();
- dataStream = writeData(new Path(basePath, String.valueOf(currentWrittenFile)),
- Ints.toByteArray(slice.length));
- dataStream.write(slice.buffer, slice.offset, slice.length);
- } else {
- dataStream.write(Ints.toByteArray(slice.length));
- dataStream.write(slice.buffer, slice.offset, slice.length);
- }
- fileWriteOffset += bytesToWrite;
-
- byte[] fileOffset = null;
- if ((currentWrittenFile > skipFile) || (currentWrittenFile == skipFile && fileWriteOffset > skipOffset)) {
- skipFile = -1;
- fileOffset = new byte[IDENTIFIER_SIZE];
- Server.writeLong(fileOffset, 0, calculateOffset(fileWriteOffset, currentWrittenFile));
- }
- return fileOffset;
- } catch (IOException ex) {
- logger.warn("Error while storing the bytes {}", ex.getMessage());
- closeFs();
- throw new RuntimeException(ex);
- }
- }
- DataBlock db = new DataBlock(dataStream, fileWriteOffset,
- new Path(basePath, currentWrittenFile + OFFSET_SUFFIX), currentWrittenFile);
- db.close();
- files2Commit.add(db);
- fileWriteOffset = 0;
- ++currentWrittenFile;
- return store(slice);
- }
-
- /**
- * @param b
- * @param startIndex
- * @return
- */
- long byteArrayToLong(byte[] b, int startIndex)
- {
- final byte b1 = 0;
- return Longs.fromBytes(b1, b1, b1, b1, b[3 + startIndex], b[2 + startIndex], b[1 + startIndex], b[startIndex]);
- }
-
- @Override
- public byte[] retrieve(byte[] identifier)
- {
- skipFile = -1;
- skipOffset = 0;
- logger.debug("retrieve with address {}", Arrays.toString(identifier));
- // flushing the last incomplete flushed file
- closeUnflushedFiles();
-
- retrievalOffset = byteArrayToLong(identifier, 0);
- retrievalFile = byteArrayToLong(identifier, offset);
-
- if (retrievalFile == 0 && retrievalOffset == 0 && currentWrittenFile == 0 && fileWriteOffset == 0) {
- skipOffset = 0;
- return null;
- }
-
- // making sure that the deleted address is not requested again
- if (retrievalFile != 0 || retrievalOffset != 0) {
- long cleanedFile = byteArrayToLong(cleanedOffset, offset);
- if (retrievalFile < cleanedFile || (retrievalFile == cleanedFile &&
- retrievalOffset < byteArrayToLong(cleanedOffset, 0))) {
- logger.warn("The address asked has been deleted retrievalFile={}, cleanedFile={}, retrievalOffset={}, " +
- "cleanedOffset={}", retrievalFile, cleanedFile, retrievalOffset, byteArrayToLong(cleanedOffset, 0));
- closeFs();
- throw new IllegalArgumentException(String.format("The data for address %s has already been deleted",
- Arrays.toString(identifier)));
- }
- }
-
- // we have just started
- if (retrievalFile == 0 && retrievalOffset == 0) {
- retrievalFile = byteArrayToLong(cleanedOffset, offset);
- retrievalOffset = byteArrayToLong(cleanedOffset, 0);
- }
-
- if ((retrievalFile > flushedFileCounter)) {
- skipFile = retrievalFile;
- skipOffset = retrievalOffset;
- retrievalFile = -1;
- return null;
- }
- if ((retrievalFile == flushedFileCounter && retrievalOffset >= flushedFileWriteOffset)) {
- skipFile = retrievalFile;
- skipOffset = retrievalOffset - flushedFileWriteOffset;
- retrievalFile = -1;
- return null;
- }
-
- try {
- if (readStream != null) {
- readStream.close();
- readStream = null;
- }
- Path path = new Path(basePath, String.valueOf(retrievalFile));
- if (!fs.exists(path)) {
- retrievalFile = -1;
- closeFs();
- throw new RuntimeException(String.format("File %s does not exist", path.toString()));
- }
-
- byte[] flushedOffset = readData(new Path(basePath, retrievalFile + OFFSET_SUFFIX));
- flushedLong = Server.readLong(flushedOffset, 0);
- while (retrievalOffset >= flushedLong && retrievalFile < flushedFileCounter) {
- retrievalOffset -= flushedLong;
- retrievalFile++;
- flushedOffset = readData(new Path(basePath, retrievalFile + OFFSET_SUFFIX));
- flushedLong = Server.readLong(flushedOffset, 0);
- }
-
- if (retrievalOffset >= flushedLong) {
- logger.warn("data not flushed for the given identifier");
- retrievalFile = -1;
- return null;
- }
- synchronized (HDFSStorage.this) {
- if (nextReadStream != null) {
- nextReadStream.close();
- nextReadStream = null;
- }
- }
- currentData = null;
- path = new Path(basePath, String.valueOf(retrievalFile));
- //readStream = new FSDataInputStream(fs.open(path));
- currentData = readData(path);
- //readStream.seek(retrievalOffset);
- storageExecutor.submit(getNextStream());
- return retrieveHelper();
- } catch (IOException e) {
- closeFs();
- throw new RuntimeException(e);
- }
- }
-
- private byte[] retrieveHelper() throws IOException
- {
- int tempRetrievalOffset = (int)retrievalOffset;
- int length = Ints.fromBytes(currentData[tempRetrievalOffset], currentData[tempRetrievalOffset + 1],
- currentData[tempRetrievalOffset + 2], currentData[tempRetrievalOffset + 3]);
- byte[] data = new byte[length + IDENTIFIER_SIZE];
- System.arraycopy(currentData, tempRetrievalOffset + 4, data, IDENTIFIER_SIZE, length);
- retrievalOffset += length + DATA_LENGTH_BYTE_SIZE;
- if (retrievalOffset >= flushedLong) {
- Server.writeLong(data, 0, calculateOffset(0, retrievalFile + 1));
- } else {
- Server.writeLong(data, 0, calculateOffset(retrievalOffset, retrievalFile));
- }
- return data;
- }
-
- @Override
- public byte[] retrieveNext()
- {
- if (retrievalFile == -1) {
- closeFs();
- throw new RuntimeException("Call retrieve first");
- }
-
- if (retrievalFile > flushedFileCounter) {
- logger.warn("data is not flushed");
- return null;
- }
-
- try {
- if (currentData == null) {
- synchronized (HDFSStorage.this) {
- if (nextRetrievalData != null && (retrievalFile == nextRetrievalFile)) {
- currentData = nextRetrievalData;
- flushedLong = nextFlushedLong;
- nextRetrievalData = null;
- } else {
- currentData = null;
- currentData = readData(new Path(basePath, String.valueOf(retrievalFile)));
- byte[] flushedOffset = readData(new Path(basePath, retrievalFile + OFFSET_SUFFIX));
- flushedLong = Server.readLong(flushedOffset, 0);
- }
- }
- storageExecutor.submit(getNextStream());
- }
-
- if (retrievalOffset >= flushedLong) {
- retrievalFile++;
- retrievalOffset = 0;
-
- if (retrievalFile > flushedFileCounter) {
- logger.warn("data is not flushed");
- return null;
- }
-
- //readStream.close();
- // readStream = new FSDataInputStream(fs.open(new Path(basePath, String.valueOf(retrievalFile))));
- // byte[] flushedOffset = readData(new Path(basePath, retrievalFile + OFFSET_SUFFIX));
- // flushedLong = Server.readLong(flushedOffset, 0);
-
- synchronized (HDFSStorage.this) {
- if (nextRetrievalData != null && (retrievalFile == nextRetrievalFile)) {
- currentData = nextRetrievalData;
- flushedLong = nextFlushedLong;
- nextRetrievalData = null;
- } else {
- currentData = null;
- currentData = readData(new Path(basePath, String.valueOf(retrievalFile)));
- byte[] flushedOffset = readData(new Path(basePath, retrievalFile + OFFSET_SUFFIX));
- flushedLong = Server.readLong(flushedOffset, 0);
- }
- }
- storageExecutor.submit(getNextStream());
- }
- //readStream.seek(retrievalOffset);
- return retrieveHelper();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- @SuppressWarnings("AssignmentToCollectionOrArrayFieldFromParameter")
- public void clean(byte[] identifier)
- {
- logger.info("clean {}", Arrays.toString(identifier));
- long cleanFileIndex = byteArrayToLong(identifier, offset);
-
- long cleanFileOffset = byteArrayToLong(identifier, 0);
- if (flushedFileCounter == -1) {
- identifier = new byte[8];
- } else if (cleanFileIndex > flushedFileCounter ||
- (cleanFileIndex == flushedFileCounter && cleanFileOffset >= flushedFileWriteOffset)) {
- // This is to make sure that we clean only the data that is flushed
- cleanFileIndex = flushedFileCounter;
- cleanFileOffset = flushedFileWriteOffset;
- Server.writeLong(identifier, 0, calculateOffset(cleanFileOffset, cleanFileIndex));
- }
- cleanedOffset = identifier;
-
- try {
- writeData(cleanFileOffsetFileTemp, identifier).close();
- fs.rename(cleanFileOffsetFileTemp, cleanFileOffsetFile);
- if (cleanedFileCounter >= cleanFileIndex) {
- return;
- }
- do {
- Path path = new Path(basePath, String.valueOf(cleanedFileCounter));
- if (fs.exists(path) && fs.isFile(path)) {
- fs.delete(path, false);
- }
- path = new Path(basePath, cleanedFileCounter + OFFSET_SUFFIX);
- if (fs.exists(path) && fs.isFile(path)) {
- fs.delete(path, false);
- }
- path = new Path(basePath, cleanedFileCounter + BOOK_KEEPING_FILE_OFFSET);
- if (fs.exists(path) && fs.isFile(path)) {
- fs.delete(path, false);
- }
- logger.info("deleted file {}", cleanedFileCounter);
- ++cleanedFileCounter;
- } while (cleanedFileCounter < cleanFileIndex);
- // writeData(cleanFileCounterFile, String.valueOf(cleanedFileCounter).getBytes()).close();
-
- } catch (IOException e) {
- logger.warn("not able to close the streams {}", e.getMessage());
- closeFs();
- throw new RuntimeException(e);
- }
- }
-
- /**
- * This is used mainly for cleaning up of counter files created
- */
- void cleanHelperFiles()
- {
- try {
- fs.delete(basePath, true);
- } catch (IOException e) {
- logger.warn(e.getMessage());
- }
- }
-
- private void closeUnflushedFiles()
- {
- try {
- files2Commit.clear();
- // closing the stream
- if (dataStream != null) {
- dataStream.close();
- dataStream = null;
- // currentWrittenFile++;
- // fileWriteOffset = 0;
- }
-
- if (!fs.exists(new Path(basePath, currentWrittenFile + OFFSET_SUFFIX))) {
- fs.delete(new Path(basePath, String.valueOf(currentWrittenFile)), false);
- }
-
- if (fs.exists(new Path(basePath, flushedFileCounter + OFFSET_SUFFIX))) {
- // This means that flush was called
- flushedFileWriteOffset = getFlushedFileWriteOffset(new Path(basePath, flushedFileCounter + OFFSET_SUFFIX));
- bookKeepingFileOffset = getFlushedFileWriteOffset(
- new Path(basePath, flushedFileCounter + BOOK_KEEPING_FILE_OFFSET));
- }
-
- if (flushedFileCounter != -1) {
- currentWrittenFile = flushedFileCounter;
- fileWriteOffset = flushedFileWriteOffset;
- } else {
- currentWrittenFile = 0;
- fileWriteOffset = 0;
- }
-
- flushedLong = 0;
-
- } catch (IOException e) {
- closeFs();
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public void flush()
- {
- nextReadStream = null;
- StringBuilder builder = new StringBuilder();
- Iterator<DataBlock> itr = files2Commit.iterator();
- DataBlock db;
- try {
- while (itr.hasNext()) {
- db = itr.next();
- db.updateOffsets();
- builder.append(db.fileName).append(", ");
- }
- files2Commit.clear();
-
- if (dataStream != null) {
- dataStream.hflush();
- writeData(flushedCounterFileTemp, String.valueOf(currentWrittenFile).getBytes()).close();
- fs.rename(flushedCounterFileTemp, flushedCounterFile);
- updateFlushedOffset(new Path(basePath, currentWrittenFile + OFFSET_SUFFIX), fileWriteOffset);
- flushedFileWriteOffset = fileWriteOffset;
- builder.append(currentWrittenFile);
- }
- logger.debug("flushed files {}", builder.toString());
- } catch (IOException ex) {
- logger.warn("not able to close the stream {}", ex.getMessage());
- closeFs();
- throw new RuntimeException(ex);
- }
- flushedFileCounter = currentWrittenFile;
- // logger.debug("flushedFileCounter in flush {}",flushedFileCounter);
- }
-
- /**
- * This updates the flushed offset
- */
- private void updateFlushedOffset(Path file, long bytesWritten)
- {
- byte[] lastStoredOffset = new byte[IDENTIFIER_SIZE];
- Server.writeLong(lastStoredOffset, 0, bytesWritten);
- try {
- writeData(file, lastStoredOffset).close();
- } catch (IOException e) {
- try {
- if (!Arrays.equals(readData(file), lastStoredOffset)) {
- closeFs();
- throw new RuntimeException(e);
- }
- } catch (Exception e1) {
- closeFs();
- throw new RuntimeException(e1);
- }
- }
- }
-
- public int getBlockSizeMultiple()
- {
- return blockSizeMultiple;
- }
-
- public void setBlockSizeMultiple(int blockSizeMultiple)
- {
- this.blockSizeMultiple = blockSizeMultiple;
- }
-
- /**
- * @return the baseDir
- */
- public String getBaseDir()
- {
- return baseDir;
- }
-
- /**
- * @param baseDir the baseDir to set
- */
- public void setBaseDir(String baseDir)
- {
- this.baseDir = baseDir;
- }
-
- /**
- * @return the id
- */
- public String getId()
- {
- return id;
- }
-
- /**
- * @param id the id to set
- */
- public void setId(String id)
- {
- this.id = id;
- }
-
- /**
- * @return the blockSize
- */
- public long getBlockSize()
- {
- return blockSize;
- }
-
- /**
- * @param blockSize the blockSize to set
- */
- public void setBlockSize(long blockSize)
- {
- this.blockSize = blockSize;
- }
-
- /**
- * @return the restore
- */
- public boolean isRestore()
- {
- return restore;
- }
-
- /**
- * @param restore the restore to set
- */
- public void setRestore(boolean restore)
- {
- this.restore = restore;
- }
-
- class DataBlock
- {
- FSDataOutputStream dataStream;
- long dataOffset;
- Path path2FlushedData;
- long fileName;
- private Path bookKeepingPath;
-
- DataBlock(FSDataOutputStream stream, long bytesWritten, Path path2FlushedData, long fileName)
- {
- this.dataStream = stream;
- this.dataOffset = bytesWritten;
- this.path2FlushedData = path2FlushedData;
- this.fileName = fileName;
- }
-
- public void close()
- {
- if (dataStream != null) {
- try {
- dataStream.close();
- bookKeepingPath = new Path(basePath, fileName + BOOK_KEEPING_FILE_OFFSET);
- updateFlushedOffset(bookKeepingPath, dataOffset);
- } catch (IOException ex) {
- logger.warn("not able to close the stream {}", ex.getMessage());
- closeFs();
- throw new RuntimeException(ex);
- }
- }
- }
-
- public void updateOffsets() throws IOException
- {
- updateFlushedOffset(path2FlushedData, dataOffset);
- if (bookKeepingPath != null && fs.exists(bookKeepingPath)) {
- fs.delete(bookKeepingPath, false);
- }
- }
-
- }
-
- private static final Logger logger = LoggerFactory.getLogger(HDFSStorage.class);
-
- @Override
- public void setup(com.datatorrent.api.Context context)
- {
- Configuration conf = new Configuration();
- if (baseDir == null) {
- baseDir = conf.get("hadoop.tmp.dir");
- if (baseDir == null || baseDir.isEmpty()) {
- throw new IllegalArgumentException("baseDir cannot be null.");
- }
- }
- offset = 4;
- skipOffset = -1;
- skipFile = -1;
- int tempRetryCount = 0;
- while (tempRetryCount < retryCount && fs == null) {
- try {
- fs = FileSystem.newInstance(conf);
- tempRetryCount++;
- } catch (Throwable throwable) {
- logger.warn("Not able to get file system ", throwable);
- }
- }
-
- try {
- Path path = new Path(baseDir);
- basePath = new Path(path, id);
- if (fs == null) {
- fs = FileSystem.newInstance(conf);
- }
- if (!fs.exists(path)) {
- closeFs();
- throw new RuntimeException(String.format("baseDir passed (%s) doesn't exist.", baseDir));
- }
- if (!fs.isDirectory(path)) {
- closeFs();
- throw new RuntimeException(String.format("baseDir passed (%s) is not a directory.", baseDir));
- }
- if (!restore) {
- fs.delete(basePath, true);
- }
- if (!fs.exists(basePath) || !fs.isDirectory(basePath)) {
- fs.mkdirs(basePath);
- }
-
- if (blockSize == 0) {
- blockSize = fs.getDefaultBlockSize(new Path(basePath, "tempData"));
- }
- if (blockSize == 0) {
- blockSize = DEFAULT_BLOCK_SIZE;
- }
-
- blockSize = blockSizeMultiple * blockSize;
-
- currentWrittenFile = 0;
- cleanedFileCounter = -1;
- retrievalFile = -1;
- // fileCounterFile = new Path(basePath, IDENTITY_FILE);
- flushedFileCounter = -1;
- // cleanFileCounterFile = new Path(basePath, CLEAN_FILE);
- cleanFileOffsetFile = new Path(basePath, CLEAN_OFFSET_FILE);
- cleanFileOffsetFileTemp = new Path(basePath, CLEAN_OFFSET_FILE_TEMP);
- flushedCounterFile = new Path(basePath, FLUSHED_IDENTITY_FILE);
- flushedCounterFileTemp = new Path(basePath, FLUSHED_IDENTITY_FILE_TEMP);
-
- if (restore) {
- //
- // if (fs.exists(fileCounterFile) && fs.isFile(fileCounterFile)) {
- // //currentWrittenFile = Long.valueOf(new String(readData(fileCounterFile)));
- // }
-
- if (fs.exists(cleanFileOffsetFile) && fs.isFile(cleanFileOffsetFile)) {
- cleanedOffset = readData(cleanFileOffsetFile);
- }
-
- if (fs.exists(flushedCounterFile) && fs.isFile(flushedCounterFile)) {
- String strFlushedFileCounter = new String(readData(flushedCounterFile));
- if (strFlushedFileCounter.isEmpty()) {
- logger.warn("empty flushed file");
- } else {
- flushedFileCounter = Long.valueOf(strFlushedFileCounter);
- flushedFileWriteOffset = getFlushedFileWriteOffset(new Path(basePath, flushedFileCounter + OFFSET_SUFFIX));
- bookKeepingFileOffset = getFlushedFileWriteOffset(
- new Path(basePath, flushedFileCounter + BOOK_KEEPING_FILE_OFFSET));
- }
-
- }
- }
- fileWriteOffset = flushedFileWriteOffset;
- currentWrittenFile = flushedFileCounter;
- cleanedFileCounter = byteArrayToLong(cleanedOffset, offset) - 1;
- if (currentWrittenFile == -1) {
- ++currentWrittenFile;
- fileWriteOffset = 0;
- }
-
- } catch (IOException io) {
-
- throw new RuntimeException(io);
- }
- storageExecutor = Executors.newSingleThreadExecutor(new NameableThreadFactory("StorageHelper"));
- }
-
- private void closeFs()
- {
- if (fs != null) {
- try {
- fs.close();
- fs = null;
- } catch (IOException e) {
- logger.debug(e.getMessage());
- }
- }
- }
-
- private long getFlushedFileWriteOffset(Path filePath) throws IOException
- {
- if (flushedFileCounter != -1 && fs.exists(filePath)) {
- byte[] flushedFileOffsetByte = readData(filePath);
- if (flushedFileOffsetByte != null && flushedFileOffsetByte.length == 8) {
- return Server.readLong(flushedFileOffsetByte, 0);
- }
- }
- return 0;
- }
-
- @Override
- public void teardown()
- {
- logger.debug("called teardown");
- try {
- if (readStream != null) {
- readStream.close();
- }
- synchronized (HDFSStorage.this) {
- if (nextReadStream != null) {
- nextReadStream.close();
- }
- }
-
- } catch (IOException e) {
- throw new RuntimeException(e);
- } finally {
- closeUnflushedFiles();
- storageExecutor.shutdown();
- }
-
- }
-
- private Runnable getNextStream()
- {
- return new Runnable()
- {
- @Override
- public void run()
- {
- try {
- synchronized (HDFSStorage.this) {
- nextRetrievalFile = retrievalFile + 1;
- if (nextRetrievalFile > flushedFileCounter) {
- nextRetrievalData = null;
- return;
- }
- Path path = new Path(basePath, String.valueOf(nextRetrievalFile));
- Path offsetPath = new Path(basePath, nextRetrievalFile + OFFSET_SUFFIX);
- nextRetrievalData = null;
- nextRetrievalData = readData(path);
- byte[] flushedOffset = readData(offsetPath);
- nextFlushedLong = Server.readLong(flushedOffset, 0);
- }
- } catch (Throwable e) {
- logger.warn("in storage executor ", e);
-
- }
- }
- };
- }
-
-}
-
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/main/java/com/datatorrent/flume/storage/Storage.java
----------------------------------------------------------------------
diff --git a/flume/src/main/java/com/datatorrent/flume/storage/Storage.java b/flume/src/main/java/com/datatorrent/flume/storage/Storage.java
deleted file mode 100644
index 5130f3c..0000000
--- a/flume/src/main/java/com/datatorrent/flume/storage/Storage.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.flume.storage;
-
-import com.datatorrent.netlet.util.Slice;
-
-/**
- * <p>Storage interface.</p>
- *
- * @since 0.9.2
- */
-public interface Storage
-{
- /**
- * key in the context for Unique identifier for the storage which may be used to recover from failure.
- */
- String ID = "id";
-
- /**
- * This stores the bytes and returns the unique identifier to retrieve these bytes
- *
- * @param bytes
- * @return
- */
- byte[] store(Slice bytes);
-
- /**
- * This returns the data bytes for the current identifier and the identifier for next data bytes. <br/>
- * The first eight bytes contain the identifier and the remaining bytes contain the data
- *
- * @param identifier
- * @return
- */
- byte[] retrieve(byte[] identifier);
-
- /**
- * This returns data bytes and the identifier for the next data bytes. The identifier for current data bytes is based
- * on the retrieve method call and number of retrieveNext method calls after retrieve method call. <br/>
- * The first eight bytes contain the identifier and the remaining bytes contain the data
- *
- * @return
- */
- byte[] retrieveNext();
-
- /**
- * This is used to clean up the files identified by identifier
- *
- * @param identifier
- */
- void clean(byte[] identifier);
-
- /**
- * This flushes the data from stream
- *
- */
- void flush();
-
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/main/java/org/apache/apex/malhar/flume/discovery/Discovery.java
----------------------------------------------------------------------
diff --git a/flume/src/main/java/org/apache/apex/malhar/flume/discovery/Discovery.java b/flume/src/main/java/org/apache/apex/malhar/flume/discovery/Discovery.java
new file mode 100644
index 0000000..619a625
--- /dev/null
+++ b/flume/src/main/java/org/apache/apex/malhar/flume/discovery/Discovery.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.flume.discovery;
+
+import java.util.Collection;
+
+/**
+ * When DTFlumeSink server instance binds to the network interface, it can publish
+ * its whereabouts by invoking advertise method on the Discovery object. Similarly
+ * when it ceases accepting any more connections, it can publish its intent to do
+ * so by invoking unadvertise.<p />
+ * Interesting parties can call discover method to get the list of addresses where
+ * they can find an available DTFlumeSink server instance.
+ *
+ * @param <T> - Type of the objects which can be discovered
+ * @since 0.9.3
+ */
+public interface Discovery<T>
+{
+ /**
+ * Recall the previously published address as it's no longer valid.
+ *
+ * @param service
+ */
+ void unadvertise(Service<T> service);
+
+ /**
+ * Advertise the host/port address where DTFlumeSink is accepting a client connection.
+ *
+ * @param service
+ */
+ void advertise(Service<T> service);
+
+ /**
+ * Discover all the addresses which are actively accepting the client connections.
+ *
+ * @return - Active server addresses which can accept the connections.
+ */
+ Collection<Service<T>> discover();
+
+ interface Service<T>
+ {
+ String getHost();
+
+ int getPort();
+
+ T getPayload();
+
+ String getId();
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/main/java/org/apache/apex/malhar/flume/discovery/ZKAssistedDiscovery.java
----------------------------------------------------------------------
diff --git a/flume/src/main/java/org/apache/apex/malhar/flume/discovery/ZKAssistedDiscovery.java b/flume/src/main/java/org/apache/apex/malhar/flume/discovery/ZKAssistedDiscovery.java
new file mode 100644
index 0000000..9a7dd3c
--- /dev/null
+++ b/flume/src/main/java/org/apache/apex/malhar/flume/discovery/ZKAssistedDiscovery.java
@@ -0,0 +1,430 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.flume.discovery;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+
+import javax.validation.constraints.NotNull;
+
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.ObjectReader;
+import org.codehaus.jackson.map.ObjectWriter;
+import org.codehaus.jackson.type.TypeReference;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryNTimes;
+import org.apache.curator.utils.EnsurePath;
+import org.apache.curator.x.discovery.ServiceDiscovery;
+import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
+import org.apache.curator.x.discovery.ServiceInstance;
+import org.apache.curator.x.discovery.details.InstanceSerializer;
+import org.apache.flume.conf.Configurable;
+
+import com.google.common.base.Throwables;
+
+import com.datatorrent.api.Component;
+
+/**
+ * <p>ZKAssistedDiscovery class.</p>
+ *
+ * @since 0.9.3
+ */
+public class ZKAssistedDiscovery implements Discovery<byte[]>,
+ Component<com.datatorrent.api.Context>, Configurable, Serializable
+{
+ @NotNull
+ private String serviceName;
+ @NotNull
+ private String connectionString;
+ @NotNull
+ private String basePath;
+ private int connectionTimeoutMillis;
+ private int connectionRetryCount;
+ private int conntectionRetrySleepMillis;
+ private transient InstanceSerializerFactory instanceSerializerFactory;
+ private transient CuratorFramework curatorFramework;
+ private transient ServiceDiscovery<byte[]> discovery;
+
+ public ZKAssistedDiscovery()
+ {
+ this.serviceName = "DTFlume";
+ this.conntectionRetrySleepMillis = 500;
+ this.connectionRetryCount = 10;
+ this.connectionTimeoutMillis = 1000;
+ }
+
+ @Override
+ public void unadvertise(Service<byte[]> service)
+ {
+ doAdvertise(service, false);
+ }
+
+ @Override
+ public void advertise(Service<byte[]> service)
+ {
+ doAdvertise(service, true);
+ }
+
+ public void doAdvertise(Service<byte[]> service, boolean flag)
+ {
+ try {
+ new EnsurePath(basePath).ensure(curatorFramework.getZookeeperClient());
+
+ ServiceInstance<byte[]> instance = getInstance(service);
+ if (flag) {
+ discovery.registerService(instance);
+ } else {
+ discovery.unregisterService(instance);
+ }
+ } catch (Exception e) {
+ throw Throwables.propagate(e);
+ }
+ }
+
+ @Override
+ public Collection<Service<byte[]>> discover()
+ {
+ try {
+ new EnsurePath(basePath).ensure(curatorFramework.getZookeeperClient());
+
+ Collection<ServiceInstance<byte[]>> services = discovery.queryForInstances(serviceName);
+ ArrayList<Service<byte[]>> returnable = new ArrayList<Service<byte[]>>(services.size());
+ for (final ServiceInstance<byte[]> service : services) {
+ returnable.add(new Service<byte[]>()
+ {
+ @Override
+ public String getHost()
+ {
+ return service.getAddress();
+ }
+
+ @Override
+ public int getPort()
+ {
+ return service.getPort();
+ }
+
+ @Override
+ public byte[] getPayload()
+ {
+ return service.getPayload();
+ }
+
+ @Override
+ public String getId()
+ {
+ return service.getId();
+ }
+
+ @Override
+ public String toString()
+ {
+ return "{" + getId() + " => " + getHost() + ':' + getPort() + '}';
+ }
+
+ });
+ }
+ return returnable;
+ } catch (Exception e) {
+ throw Throwables.propagate(e);
+ }
+ }
+
+ @Override
+ public String toString()
+ {
+ return "ZKAssistedDiscovery{" + "serviceName=" + serviceName + ", connectionString=" + connectionString +
+ ", basePath=" + basePath + ", connectionTimeoutMillis=" + connectionTimeoutMillis + ", connectionRetryCount=" +
+ connectionRetryCount + ", conntectionRetrySleepMillis=" + conntectionRetrySleepMillis + '}';
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int hash = 7;
+ hash = 47 * hash + this.serviceName.hashCode();
+ hash = 47 * hash + this.connectionString.hashCode();
+ hash = 47 * hash + this.basePath.hashCode();
+ hash = 47 * hash + this.connectionTimeoutMillis;
+ hash = 47 * hash + this.connectionRetryCount;
+ hash = 47 * hash + this.conntectionRetrySleepMillis;
+ return hash;
+ }
+
+ @Override
+ public boolean equals(Object obj)
+ {
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ final ZKAssistedDiscovery other = (ZKAssistedDiscovery)obj;
+ if (!this.serviceName.equals(other.serviceName)) {
+ return false;
+ }
+ if (!this.connectionString.equals(other.connectionString)) {
+ return false;
+ }
+ if (!this.basePath.equals(other.basePath)) {
+ return false;
+ }
+ if (this.connectionTimeoutMillis != other.connectionTimeoutMillis) {
+ return false;
+ }
+ if (this.connectionRetryCount != other.connectionRetryCount) {
+ return false;
+ }
+ if (this.conntectionRetrySleepMillis != other.conntectionRetrySleepMillis) {
+ return false;
+ }
+ return true;
+ }
+
+ ServiceInstance<byte[]> getInstance(Service<byte[]> service) throws Exception
+ {
+ return ServiceInstance.<byte[]>builder()
+ .name(serviceName)
+ .address(service.getHost())
+ .port(service.getPort())
+ .id(service.getId())
+ .payload(service.getPayload())
+ .build();
+ }
+
+ private ServiceDiscovery<byte[]> getDiscovery(CuratorFramework curatorFramework)
+ {
+ return ServiceDiscoveryBuilder.builder(byte[].class)
+ .basePath(basePath)
+ .client(curatorFramework)
+ .serializer(instanceSerializerFactory.getInstanceSerializer(
+ new TypeReference<ServiceInstance<byte[]>>()
+ {})).build();
+ }
+
+ /**
+ * @return the instanceSerializerFactory
+ */
+ InstanceSerializerFactory getInstanceSerializerFactory()
+ {
+ return instanceSerializerFactory;
+ }
+
+ /**
+ * @return the connectionString
+ */
+ public String getConnectionString()
+ {
+ return connectionString;
+ }
+
+ /**
+ * @param connectionString the connectionString to set
+ */
+ public void setConnectionString(String connectionString)
+ {
+ this.connectionString = connectionString;
+ }
+
+ /**
+ * @return the basePath
+ */
+ public String getBasePath()
+ {
+ return basePath;
+ }
+
+ /**
+ * @param basePath the basePath to set
+ */
+ public void setBasePath(String basePath)
+ {
+ this.basePath = basePath;
+ }
+
+ /**
+ * @return the connectionTimeoutMillis
+ */
+ public int getConnectionTimeoutMillis()
+ {
+ return connectionTimeoutMillis;
+ }
+
+ /**
+ * @param connectionTimeoutMillis the connectionTimeoutMillis to set
+ */
+ public void setConnectionTimeoutMillis(int connectionTimeoutMillis)
+ {
+ this.connectionTimeoutMillis = connectionTimeoutMillis;
+ }
+
+ /**
+ * @return the connectionRetryCount
+ */
+ public int getConnectionRetryCount()
+ {
+ return connectionRetryCount;
+ }
+
+ /**
+ * @param connectionRetryCount the connectionRetryCount to set
+ */
+ public void setConnectionRetryCount(int connectionRetryCount)
+ {
+ this.connectionRetryCount = connectionRetryCount;
+ }
+
+ /**
+ * @return the conntectionRetrySleepMillis
+ */
+ public int getConntectionRetrySleepMillis()
+ {
+ return conntectionRetrySleepMillis;
+ }
+
+ /**
+ * @param conntectionRetrySleepMillis the conntectionRetrySleepMillis to set
+ */
+ public void setConntectionRetrySleepMillis(int conntectionRetrySleepMillis)
+ {
+ this.conntectionRetrySleepMillis = conntectionRetrySleepMillis;
+ }
+
+ /**
+ * @return the serviceName
+ */
+ public String getServiceName()
+ {
+ return serviceName;
+ }
+
+ /**
+ * @param serviceName the serviceName to set
+ */
+ public void setServiceName(String serviceName)
+ {
+ this.serviceName = serviceName;
+ }
+
+ @Override
+ public void configure(org.apache.flume.Context context)
+ {
+ serviceName = context.getString("serviceName", "DTFlume");
+ connectionString = context.getString("connectionString");
+ basePath = context.getString("basePath");
+
+ connectionTimeoutMillis = context.getInteger("connectionTimeoutMillis", 1000);
+ connectionRetryCount = context.getInteger("connectionRetryCount", 10);
+ conntectionRetrySleepMillis = context.getInteger("connectionRetrySleepMillis", 500);
+ }
+
+ @Override
+ public void setup(com.datatorrent.api.Context context)
+ {
+ ObjectMapper om = new ObjectMapper();
+ instanceSerializerFactory = new InstanceSerializerFactory(om.reader(), om.writer());
+
+ curatorFramework = CuratorFrameworkFactory.builder()
+ .connectionTimeoutMs(connectionTimeoutMillis)
+ .retryPolicy(new RetryNTimes(connectionRetryCount, conntectionRetrySleepMillis))
+ .connectString(connectionString)
+ .build();
+ curatorFramework.start();
+
+ discovery = getDiscovery(curatorFramework);
+ try {
+ discovery.start();
+ } catch (Exception ex) {
+ Throwables.propagate(ex);
+ }
+ }
+
+ @Override
+ public void teardown()
+ {
+ try {
+ discovery.close();
+ } catch (IOException ex) {
+ throw new RuntimeException(ex);
+ } finally {
+ curatorFramework.close();
+ curatorFramework = null;
+ }
+ }
+
+ public class InstanceSerializerFactory
+ {
+ private final ObjectReader objectReader;
+ private final ObjectWriter objectWriter;
+
+ InstanceSerializerFactory(ObjectReader objectReader, ObjectWriter objectWriter)
+ {
+ this.objectReader = objectReader;
+ this.objectWriter = objectWriter;
+ }
+
+ public <T> InstanceSerializer<T> getInstanceSerializer(
+ TypeReference<ServiceInstance<T>> typeReference)
+ {
+ return new JacksonInstanceSerializer<T>(objectReader, objectWriter, typeReference);
+ }
+
+ final class JacksonInstanceSerializer<T> implements InstanceSerializer<T>
+ {
+ private final TypeReference<ServiceInstance<T>> typeRef;
+ private final ObjectWriter objectWriter;
+ private final ObjectReader objectReader;
+
+ JacksonInstanceSerializer(ObjectReader objectReader, ObjectWriter objectWriter,
+ TypeReference<ServiceInstance<T>> typeRef)
+ {
+ this.objectReader = objectReader;
+ this.objectWriter = objectWriter;
+ this.typeRef = typeRef;
+ }
+
+ @Override
+ public ServiceInstance<T> deserialize(byte[] bytes) throws Exception
+ {
+ return objectReader.withType(typeRef).readValue(bytes);
+ }
+
+ @Override
+ public byte[] serialize(ServiceInstance<T> serviceInstance) throws Exception
+ {
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ objectWriter.writeValue(out, serviceInstance);
+ return out.toByteArray();
+ }
+
+ }
+
+ }
+
+ private static final long serialVersionUID = 201401221145L;
+ private static final Logger logger = LoggerFactory.getLogger(ZKAssistedDiscovery.class);
+}