You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ey...@apache.org on 2009/02/27 02:32:02 UTC
svn commit: r748369 - in /hadoop/core/trunk/src/contrib/chukwa/src:
java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/
java/org/apache/hadoop/chukwa/datacollection/agent/
test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/
Author: eyang
Date: Fri Feb 27 01:32:01 2009
New Revision: 748369
URL: http://svn.apache.org/viewvc?rev=748369&view=rev
Log:
HADOOP-4893. Added graceful period for the adaptor to wait for the late data stream arrival.
Added:
hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileExpirationPolicy.java
Modified:
hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java
hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java
Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java?rev=748369&r1=748368&r2=748369&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java (original)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java Fri Feb 27 01:32:01 2009
@@ -51,9 +51,12 @@
public static final int DEFAULT_MAX_READ_SIZE = 128 * 1024 ;
public static int MAX_READ_SIZE = DEFAULT_MAX_READ_SIZE ;
public static int MAX_RETRIES = 300;
+ public static int GRACEFUL_PERIOD = 3 * 60 * 1000; // 3 minutes
+
protected static Configuration conf = null;
private int attempts = 0;
-
+ private long gracefulPeriodExpired = 0l;
+ private boolean adaptorInError = false;
File toWatch;
/**
* next PHYSICAL offset to read
@@ -155,22 +158,47 @@
*/
public synchronized boolean tailFile(ChunkReceiver eq) throws InterruptedException {
boolean hasMoreData = false;
+
+
try {
- if(!toWatch.exists() && attempts>MAX_RETRIES) {
- log.warn("Adaptor|" + adaptorID +"| File does not exist: "+toWatch.getAbsolutePath()+", streaming policy expired. File removed from streaming.");
- ChukwaAgent agent = ChukwaAgent.getAgent();
- if (agent != null) {
- agent.stopAdaptor(adaptorID, false);
- } else {
- log.info("Agent is null, running in default mode");
- }
- tailer.stopWatchingFile(this);
- return false;
- } else if(!toWatch.exists()) {
- log.warn("failed to stream data for: "+toWatch.getAbsolutePath()+", attempt: "+attempts+" of "+MAX_RETRIES);
- attempts++;
- return false; //no more data
+ if( (adaptorInError == true) && (System.currentTimeMillis() > gracefulPeriodExpired)) {
+ if (!toWatch.exists()) {
+ log.warn("Adaptor|" + adaptorID +"|attempts=" + attempts + "| File does not exist: "+toWatch.getAbsolutePath()+", streaming policy expired. File removed from streaming.");
+ } else if (!toWatch.canRead()) {
+ log.warn("Adaptor|" + adaptorID +"|attempts=" + attempts + "| File cannot be read: "+toWatch.getAbsolutePath()+", streaming policy expired. File removed from streaming.");
+ } else {
+ // Should have never been there
+ adaptorInError = false;
+ gracefulPeriodExpired = 0L;
+ attempts = 0;
+ return false;
+ }
+
+
+ ChukwaAgent agent = ChukwaAgent.getAgent();
+ if (agent != null) {
+ agent.stopAdaptor(adaptorID, false);
+ } else {
+ log.info("Agent is null, running in default mode");
+ }
+ return false;
+
+ } else if(!toWatch.exists() || !toWatch.canRead()) {
+ if (adaptorInError == false) {
+ long now = System.currentTimeMillis();
+ gracefulPeriodExpired = now + GRACEFUL_PERIOD;
+ adaptorInError = true;
+ attempts = 0;
+ log.warn("failed to stream data for: "+toWatch.getAbsolutePath()+", graceful period will Expire at now:" + now
+ + " + " + GRACEFUL_PERIOD + " secs, i.e:" + gracefulPeriodExpired);
+ } else if (attempts%10 == 0) {
+ log.info("failed to stream data for: "+toWatch.getAbsolutePath()+", attempt: "+attempts);
}
+
+ attempts++;
+ return false; //no more data
+ }
+
if (reader == null)
{
reader = new RandomAccessFile(toWatch, "r");
@@ -281,6 +309,7 @@
log.warn("failure reading " + toWatch, e);
}
attempts=0;
+ adaptorInError = false;
return hasMoreData;
}
Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java?rev=748369&r1=748368&r2=748369&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java (original)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java Fri Feb 27 01:32:01 2009
@@ -508,6 +508,10 @@
try {
if (gracefully) {
offset = toStop.shutdown();
+ log.info("shutdown on adaptor: " + number + ", " + toStop.getCurrentStatus());
+ } else {
+ toStop.hardStop();
+ log.info("hardStop on adaptorId: " + number + ", " + toStop.getCurrentStatus());
}
} catch (AdaptorException e) {
log.error("adaptor failed to stop cleanly", e);
Added: hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileExpirationPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileExpirationPolicy.java?rev=748369&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileExpirationPolicy.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileExpirationPolicy.java Fri Feb 27 01:32:01 2009
@@ -0,0 +1,115 @@
+package org.apache.hadoop.chukwa.datacollection.adaptor.filetailer;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintWriter;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
+import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
+import org.apache.hadoop.chukwa.datacollection.controller.ChukwaAgentController;
+
+public class TestFileExpirationPolicy extends TestCase {
+
+ public void testExpiration() {
+ ChukwaAgent agent = null;
+
+ try {
+ agent = new ChukwaAgent();
+ // Remove any adaptor left over from previous run
+ ChukwaConfiguration cc = new ChukwaConfiguration();
+ int portno = cc.getInt("chukwaAgent.control.port", 9093);
+ ChukwaAgentController cli = new ChukwaAgentController("localhost", portno);
+ cli.removeAll();
+ // sleep for some time to make sure we don't get chunk from existing
+ // streams
+ Thread.sleep(5000);
+
+ FileTailingAdaptor.GRACEFUL_PERIOD = 30 * 1000;
+
+ long adaptorId = agent
+ .processCommand("add org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8NewLineEscaped MyType 0 /myWrongPath"
+ + System.currentTimeMillis() + " 0");
+
+ assertTrue(adaptorId != -1);
+
+ assertTrue(agent.getAdaptorList().containsKey(adaptorId) == true);
+
+ Thread.sleep(FileTailingAdaptor.GRACEFUL_PERIOD + 10000);
+ assertTrue(agent.getAdaptorList().containsKey(adaptorId) == false);
+
+ } catch (Exception e) {
+ Assert.fail("Exception in TestFileExpirationPolicy");
+ } finally {
+ if (agent != null) {
+ agent.shutdown();
+ }
+ }
+
+ }
+
+ public void testExpirationOnFileThatHasBennDeleted() {
+ ChukwaAgent agent = null;
+ File testFile = null;
+ try {
+
+ File tempDir = new File(System.getProperty("test.build.data", "/tmp"));
+ if (!tempDir.exists()) {
+ tempDir.mkdirs();
+ }
+ String logFile = tempDir.getPath() + "/chukwatestExpiration.txt";
+ testFile = makeTestFile(logFile, 8000);
+
+ agent = new ChukwaAgent();
+ // Remove any adaptor left over from previous run
+ ChukwaConfiguration cc = new ChukwaConfiguration();
+ int portno = cc.getInt("chukwaAgent.control.port", 9093);
+ ChukwaAgentController cli = new ChukwaAgentController("localhost", portno);
+ cli.removeAll();
+ // sleep for some time to make sure we don't get chunk from existing
+ // streams
+ Thread.sleep(5000);
+
+ assertTrue(testFile.canRead() == true);
+
+ FileTailingAdaptor.GRACEFUL_PERIOD = 30 * 1000;
+ long adaptorId = agent
+ .processCommand("add org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8NewLineEscaped MyType 0 "
+ + logFile +" 0");
+
+ assertTrue(adaptorId != -1);
+
+ assertTrue(agent.getAdaptorList().containsKey(adaptorId) == true);
+
+ Thread.sleep(10000);
+ testFile.delete();
+
+ Thread.sleep(FileTailingAdaptor.GRACEFUL_PERIOD + 10000);
+ assertTrue(agent.getAdaptorList().containsKey(adaptorId) == false);
+ agent.shutdown();
+ } catch (Exception e) {
+ Assert.fail("Exception in TestFileExpirationPolicy");
+ } finally {
+ if (agent != null) {
+ agent.shutdown();
+ }
+ }
+ }
+
+ private File makeTestFile(String name, int size) throws IOException {
+ File tmpOutput = new File(name);
+ FileOutputStream fos = new FileOutputStream(tmpOutput);
+
+ PrintWriter pw = new PrintWriter(fos);
+ for (int i = 0; i < size; ++i) {
+ pw.print(i + " ");
+ pw.println("abcdefghijklmnopqrstuvwxyz");
+ }
+ pw.flush();
+ pw.close();
+ return tmpOutput;
+ }
+}