You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ey...@apache.org on 2009/02/27 02:32:02 UTC

svn commit: r748369 - in /hadoop/core/trunk/src/contrib/chukwa/src: java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/ java/org/apache/hadoop/chukwa/datacollection/agent/ test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/

Author: eyang
Date: Fri Feb 27 01:32:01 2009
New Revision: 748369

URL: http://svn.apache.org/viewvc?rev=748369&view=rev
Log:
HADOOP-4893.  Added graceful period for the adaptor to wait for the late data stream arrival.

Added:
    hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileExpirationPolicy.java
Modified:
    hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java
    hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java

Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java?rev=748369&r1=748368&r2=748369&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java (original)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java Fri Feb 27 01:32:01 2009
@@ -51,9 +51,12 @@
 	public static final int DEFAULT_MAX_READ_SIZE = 128 * 1024 ;
 	public static int MAX_READ_SIZE = DEFAULT_MAX_READ_SIZE ;
 	public static int MAX_RETRIES = 300;
+	public static int GRACEFUL_PERIOD = 3 * 60 * 1000; // 3 minutes
+  
 	protected static Configuration conf = null;
 	private int attempts = 0;
-	
+	private long gracefulPeriodExpired = 0l;
+	private boolean adaptorInError = false;
 	File toWatch;
 	/**
 	 * next PHYSICAL offset to read
@@ -155,22 +158,47 @@
 	 */
 	public synchronized boolean tailFile(ChunkReceiver eq) throws InterruptedException {
     boolean hasMoreData = false;
+
+    
 	    try {
-	        if(!toWatch.exists() && attempts>MAX_RETRIES) {
-	    	    log.warn("Adaptor|" + adaptorID +"| File does not exist: "+toWatch.getAbsolutePath()+", streaming policy expired.  File removed from streaming.");
-       			ChukwaAgent agent = ChukwaAgent.getAgent();
-    			if (agent != null) {
-    				agent.stopAdaptor(adaptorID, false);
-    			} else {
-    				log.info("Agent is null, running in default mode");
-    			}
-    		    tailer.stopWatchingFile(this);
-	    	    return false;
-	        } else if(!toWatch.exists()) {
-	        	log.warn("failed to stream data for: "+toWatch.getAbsolutePath()+", attempt: "+attempts+" of "+MAX_RETRIES);
-	        	attempts++;
-	            return false;  //no more data
+	      if( (adaptorInError == true)  && (System.currentTimeMillis() > gracefulPeriodExpired)) {
+	        if (!toWatch.exists()) {
+	          log.warn("Adaptor|" + adaptorID +"|attempts=" +  attempts + "| File does not exist: "+toWatch.getAbsolutePath()+", streaming policy expired.  File removed from streaming.");
+	        } else if (!toWatch.canRead()) {
+	          log.warn("Adaptor|" + adaptorID +"|attempts=" +  attempts + "| File cannot be read: "+toWatch.getAbsolutePath()+", streaming policy expired.  File removed from streaming.");
+	        } else {
+	          // Should have never been there
+	          adaptorInError = false;
+	          gracefulPeriodExpired = 0L;
+	          attempts = 0;
+	          return false;
+	        }
+
+
+	        ChukwaAgent agent = ChukwaAgent.getAgent();
+	        if (agent != null) {
+	          agent.stopAdaptor(adaptorID, false);
+	        } else {
+	          log.info("Agent is null, running in default mode");
+	        }
+	        return false;
+
+	      } else if(!toWatch.exists() || !toWatch.canRead()) {
+	        if (adaptorInError == false) {
+	          long now = System.currentTimeMillis();
+	          gracefulPeriodExpired = now + GRACEFUL_PERIOD;
+	          adaptorInError = true;
+	          attempts = 0;
+	          log.warn("failed to stream data for: "+toWatch.getAbsolutePath()+", graceful period will Expire at now:" + now 
+	              + " + " +  GRACEFUL_PERIOD + " secs, i.e:" + gracefulPeriodExpired);
+	        } else if (attempts%10 == 0) {
+	            log.info("failed to stream data for: "+toWatch.getAbsolutePath()+", attempt: "+attempts);  
 	        }
+
+	        attempts++;
+	        return false;  //no more data
+	      } 
+	        
 	      	if (reader == null)
 	      	{
 	      		reader = new RandomAccessFile(toWatch, "r");
@@ -281,6 +309,7 @@
 	    	log.warn("failure reading " + toWatch, e);
 	    }
 	    attempts=0;
+	    adaptorInError = false;
 	    return hasMoreData;
 	}
 	

Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java?rev=748369&r1=748368&r2=748369&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java (original)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java Fri Feb 27 01:32:01 2009
@@ -508,6 +508,10 @@
     try {    	      
       if (gracefully) {
    	    offset = toStop.shutdown(); 
+   	   log.info("shutdown on adaptor: " + number + ", " + toStop.getCurrentStatus());
+      } else { 
+        toStop.hardStop();
+        log.info("hardStop on adaptorId: " + number + ", " + toStop.getCurrentStatus());
       }
     } catch (AdaptorException e) {
       log.error("adaptor failed to stop cleanly", e);

Added: hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileExpirationPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileExpirationPolicy.java?rev=748369&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileExpirationPolicy.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileExpirationPolicy.java Fri Feb 27 01:32:01 2009
@@ -0,0 +1,115 @@
+package org.apache.hadoop.chukwa.datacollection.adaptor.filetailer;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintWriter;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
+import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
+import org.apache.hadoop.chukwa.datacollection.controller.ChukwaAgentController;
+
+public class TestFileExpirationPolicy extends TestCase {
+
+  public void testExpiration() {
+    ChukwaAgent agent = null;
+
+    try {
+      agent = new ChukwaAgent();
+      // Remove any adaptor left over from previous run
+      ChukwaConfiguration cc = new ChukwaConfiguration();
+      int portno = cc.getInt("chukwaAgent.control.port", 9093);
+      ChukwaAgentController cli = new ChukwaAgentController("localhost", portno);
+      cli.removeAll();
+      // sleep for some time to make sure we don't get chunk from existing
+      // streams
+      Thread.sleep(5000);
+
+      FileTailingAdaptor.GRACEFUL_PERIOD = 30 * 1000;
+
+      long adaptorId = agent
+          .processCommand("add org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8NewLineEscaped MyType 0 /myWrongPath"
+              + System.currentTimeMillis() + " 0");
+
+      assertTrue(adaptorId != -1);
+
+      assertTrue(agent.getAdaptorList().containsKey(adaptorId) == true);
+
+      Thread.sleep(FileTailingAdaptor.GRACEFUL_PERIOD + 10000);
+      assertTrue(agent.getAdaptorList().containsKey(adaptorId) == false);
+
+    } catch (Exception e) {
+      Assert.fail("Exception in TestFileExpirationPolicy");
+    } finally {
+      if (agent != null) {
+        agent.shutdown();
+      }
+    }
+
+  }
+
+  public void testExpirationOnFileThatHasBennDeleted() {
+    ChukwaAgent agent = null;
+    File testFile = null;
+    try {
+
+      File tempDir = new File(System.getProperty("test.build.data", "/tmp"));
+      if (!tempDir.exists()) {
+        tempDir.mkdirs();
+      }
+      String logFile = tempDir.getPath() + "/chukwatestExpiration.txt";
+      testFile = makeTestFile(logFile, 8000);
+
+      agent = new ChukwaAgent();
+      // Remove any adaptor left over from previous run
+      ChukwaConfiguration cc = new ChukwaConfiguration();
+      int portno = cc.getInt("chukwaAgent.control.port", 9093);
+      ChukwaAgentController cli = new ChukwaAgentController("localhost", portno);
+      cli.removeAll();
+      // sleep for some time to make sure we don't get chunk from existing
+      // streams
+      Thread.sleep(5000);
+
+      assertTrue(testFile.canRead() == true);
+
+      FileTailingAdaptor.GRACEFUL_PERIOD = 30 * 1000;
+      long adaptorId = agent
+          .processCommand("add org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8NewLineEscaped MyType 0 " 
+              + logFile +" 0");
+
+      assertTrue(adaptorId != -1);
+
+      assertTrue(agent.getAdaptorList().containsKey(adaptorId) == true);
+
+      Thread.sleep(10000);
+      testFile.delete();
+
+      Thread.sleep(FileTailingAdaptor.GRACEFUL_PERIOD + 10000);
+      assertTrue(agent.getAdaptorList().containsKey(adaptorId) == false);
+      agent.shutdown();
+    } catch (Exception e) {
+      Assert.fail("Exception in TestFileExpirationPolicy");
+    } finally {
+      if (agent != null) {
+        agent.shutdown();
+      }
+    }
+  }
+
+  private File makeTestFile(String name, int size) throws IOException {
+    File tmpOutput = new File(name);
+    FileOutputStream fos = new FileOutputStream(tmpOutput);
+
+    PrintWriter pw = new PrintWriter(fos);
+    for (int i = 0; i < size; ++i) {
+      pw.print(i + " ");
+      pw.println("abcdefghijklmnopqrstuvwxyz");
+    }
+    pw.flush();
+    pw.close();
+    return tmpOutput;
+  }
+}