You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@chukwa.apache.org by as...@apache.org on 2009/12/21 22:13:28 UTC

svn commit: r892979 - in /hadoop/chukwa/trunk/src: java/org/apache/hadoop/chukwa/datacollection/adaptor/ java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/ test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/

Author: asrabkin
Date: Mon Dec 21 21:13:15 2009
New Revision: 892979

URL: http://svn.apache.org/viewvc?rev=892979&view=rev
Log:
wrong version of CHUKWA-421

Modified:
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/AbstractWrapper.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/MemBuffered.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/NotifyOnCommitAdaptor.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailer.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/LWFTAdaptor.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/RCheckFTAdaptor.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TerminatorThread.java
    hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestRCheckAdaptor.java

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/AbstractWrapper.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/AbstractWrapper.java?rev=892979&r1=892978&r2=892979&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/AbstractWrapper.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/AbstractWrapper.java Mon Dec 21 21:13:15 2009
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.hadoop.chukwa.datacollection.adaptor;
 
 import java.util.*;

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/MemBuffered.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/MemBuffered.java?rev=892979&r1=892978&r2=892979&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/MemBuffered.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/MemBuffered.java Mon Dec 21 21:13:15 2009
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.hadoop.chukwa.datacollection.adaptor;
 
 import java.util.*;

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/NotifyOnCommitAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/NotifyOnCommitAdaptor.java?rev=892979&r1=892978&r2=892979&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/NotifyOnCommitAdaptor.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/NotifyOnCommitAdaptor.java Mon Dec 21 21:13:15 2009
@@ -1,6 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.hadoop.chukwa.datacollection.adaptor;
 
-
 public interface NotifyOnCommitAdaptor extends Adaptor {
     abstract void committed(long commitedByte);
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailer.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailer.java?rev=892979&r1=892978&r2=892979&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailer.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailer.java Mon Dec 21 21:13:15 2009
@@ -41,7 +41,7 @@
 
   private List<LWFTAdaptor> adaptors;
   private volatile boolean isRunning = true;
-  ChunkQueue eq; // not private -- useful for file tailing adaptor classes
+//  ChunkQueue eq; // not private -- useful for file tailing adaptor classes
 
   /**
    * How often to tail each file.
@@ -56,7 +56,7 @@
     SAMPLE_PERIOD_MS = conf.getInt(
         "chukwaAgent.adaptor.context.switch.time",
         DEFAULT_SAMPLE_PERIOD_MS);
-    eq = DataFactory.getInstance().getEventQueue();
+//    eq = DataFactory.getInstance().getEventQueue();
 
     // iterations are much more common than adding a new adaptor
     adaptors = new CopyOnWriteArrayList<LWFTAdaptor>();
@@ -81,7 +81,7 @@
         boolean shouldISleep = true;
         long startTime = System.currentTimeMillis();
         for (LWFTAdaptor f : adaptors) {
-          boolean hasMoreData = f.tailFile(eq);
+          boolean hasMoreData = f.tailFile();
           shouldISleep &= !hasMoreData;
         }
         long timeToReadFiles = System.currentTimeMillis() - startTime;

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java?rev=892979&r1=892978&r2=892979&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java Mon Dec 21 21:13:15 2009
@@ -100,7 +100,7 @@
         if (toWatch.exists()) {
           int retry = 0;
           tailer.stopWatchingFile(this);
-          TerminatorThread lastTail = new TerminatorThread(this, tailer.eq);
+          TerminatorThread lastTail = new TerminatorThread(this);
           lastTail.setDaemon(true);
           lastTail.start();
           
@@ -129,7 +129,7 @@
       }
       break;
     }
-    log.info("Exist Shutdown:" + shutdownPolicy.name()+ " - ObjectId:" + this);
+    log.info("Exit Shutdown:" + shutdownPolicy.name()+ " - ObjectId:" + this);
     return fileReadOffset + offsetOfFirstByte;
   }
   
@@ -141,7 +141,8 @@
    * 
    * @param eq the queue to write Chunks into
    */
-  public synchronized boolean tailFile(ChunkReceiver eq)
+  @Override
+  public synchronized boolean tailFile()
       throws InterruptedException {
     boolean hasMoreData = false;
 

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/LWFTAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/LWFTAdaptor.java?rev=892979&r1=892978&r2=892979&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/LWFTAdaptor.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/LWFTAdaptor.java Mon Dec 21 21:13:15 2009
@@ -206,7 +206,7 @@
     return hasMoreData;
   }
   
-  public synchronized boolean tailFile(ChunkReceiver eq)
+  public synchronized boolean tailFile()
   throws InterruptedException {
     boolean hasMoreData = false;
     try {

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/RCheckFTAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/RCheckFTAdaptor.java?rev=892979&r1=892978&r2=892979&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/RCheckFTAdaptor.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/RCheckFTAdaptor.java Mon Dec 21 21:13:15 2009
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.hadoop.chukwa.datacollection.adaptor.filetailer;
 
 import java.io.File;
@@ -35,8 +52,9 @@
         return -1;
       else if (mod > o.mod)
         return 1;
-      //want toWatch to be last
-      else return (o.f.getName().compareTo(f.getName()));//shouldn't happen?
+      //want toWatch to be last after a rotation; otherwise, this is basically 
+      //just a heuristic that hasn't been tuned yet
+      else return (o.f.getName().compareTo(f.getName()));
     }
   }
   
@@ -81,7 +99,7 @@
     File toWatchDir = toWatch.getParentFile();
     File[] candidates = toWatchDir.listFiles(this);
     if(candidates == null) {
-      log.error(toWatchDir + " is not a directory");
+      log.error(toWatchDir + " is not a directory in "+adaptorID);
     } else {
       log.debug("saw " + candidates.length + " files matching pattern");
       fileQ = new LinkedList<FPair>();
@@ -113,7 +131,7 @@
   }
   
   @Override
-  public synchronized boolean tailFile(ChunkReceiver eq)
+  public synchronized boolean tailFile()
   throws InterruptedException {
     boolean hasMoreData = false;
     try {
@@ -126,10 +144,13 @@
       if(cur == null) //file we're watching doesn't exist
         return false;
       
-      log.debug("treating " + cur + " as " + toWatch);
       
       long len = cur.length();
       long tsPreTail = cur.exists() ? cur.lastModified() : prevFileLastModDate;
+
+      if(log.isDebugEnabled())
+        log.debug(adaptorID + " treating " + cur + " as " + toWatch + " len = " + len);
+      
       if(len < fileReadOffset) {
         log.info("file "+ cur +" shrank from " + fileReadOffset + " to " + len);
         //no unseen changes to prev version, since mod date is older than last scan.
@@ -155,10 +176,15 @@
       }
         
     } catch(IOException e) {
-      log.warn("IOException in tailer", e);
+      log.warn("IOException in "+adaptorID, e);
       deregisterAndStop(false);
     }
     
     return hasMoreData;
   }
+  
+
+  public String toString() {
+    return "Rotation-aware Tailer on " + toWatch;
+  }
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TerminatorThread.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TerminatorThread.java?rev=892979&r1=892978&r2=892979&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TerminatorThread.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TerminatorThread.java Mon Dec 21 21:13:15 2009
@@ -26,11 +26,9 @@
   private static Logger log = Logger.getLogger(TerminatorThread.class);
 
   private FileTailingAdaptor adaptor = null;
-  private ChunkReceiver eq = null;
 
-  public TerminatorThread(FileTailingAdaptor adaptor, ChunkReceiver eq) {
+  public TerminatorThread(FileTailingAdaptor adaptor) {
     this.adaptor = adaptor;
-    this.eq = eq;
   }
 
   public void run() {
@@ -40,7 +38,7 @@
     int count = 0;
     log.info("Terminator thread started." + adaptor.toWatch.getPath());
     try {
-      while (adaptor.tailFile(eq)) {
+      while (adaptor.tailFile()) {
         if (log.isDebugEnabled()) {
           log.debug("Terminator thread:" + adaptor.toWatch.getPath()
               + " still working");

Modified: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestRCheckAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestRCheckAdaptor.java?rev=892979&r1=892978&r2=892979&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestRCheckAdaptor.java (original)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestRCheckAdaptor.java Mon Dec 21 21:13:15 2009
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.hadoop.chukwa.datacollection.adaptor.filetailer;
 
 import java.io.File;
@@ -5,14 +22,17 @@
 import java.io.IOException;
 import java.io.PrintWriter;
 import org.apache.hadoop.chukwa.Chunk;
+import org.apache.hadoop.chukwa.datacollection.ChunkReceiver;
+import org.apache.hadoop.chukwa.datacollection.agent.AdaptorManager;
 import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
 import org.apache.hadoop.chukwa.datacollection.connector.ChunkCatcherConnector;
 import org.apache.hadoop.conf.Configuration;
 import junit.framework.TestCase;
+import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorShutdownPolicy;
 import org.apache.hadoop.chukwa.datacollection.adaptor.TestDirTailingAdaptor;
 import org.apache.log4j.Level;
 
-public class TestRCheckAdaptor extends TestCase {
+public class TestRCheckAdaptor extends TestCase implements ChunkReceiver {
   
   ChunkCatcherConnector chunks;
 
@@ -21,7 +41,7 @@
     chunks.start();
   }
 
-  public void testLogRotate() throws IOException, InterruptedException,
+  public void testBaseCases() throws IOException, InterruptedException,
       ChukwaAgent.AlreadyRunningException {
     Configuration conf = new Configuration();
     conf.set("chukwaAgent.control.port", "0");
@@ -89,5 +109,59 @@
     agent.shutdown();
     Thread.sleep(2000);
   }
+  
+  
+  public void testContinuously() throws Exception {
+    File baseDir = new File(System.getProperty("test.build.data", "/tmp") + "/rcheck");
+    TestDirTailingAdaptor.createEmptyDir(baseDir);
+    File tmpOutput = new File(baseDir, "continuousTest");
+    PrintWriter pw = new PrintWriter(new FileOutputStream(tmpOutput, true));
+    LWFTAdaptor.tailer.SAMPLE_PERIOD_MS = 2000;
+
+//    RCheckFTAdaptor.log.setLevel(Level.DEBUG);
+    RCheckFTAdaptor rca = new RCheckFTAdaptor();
+    rca.parseArgs("Test", tmpOutput.getAbsolutePath(), AdaptorManager.NULL);
+    rca.start("id", "Test", 0, this);
+    
+
+    Thread.sleep(1000);
+    for(int i= 0; i < 200; ++i) {
+      Thread.sleep(120);
+      pw.println("This is line:" + i);
+      if( i % 5 == 0)
+        pw.flush();
+      if(i % 20 == 0) {
+        System.err.println("rotating");
+        pw.close();
+        tmpOutput.renameTo( new File(baseDir, "continuousTest."+(i/10)));
+        pw = new PrintWriter(new FileOutputStream(tmpOutput, true));
+      }
+    }
+    Thread.sleep(1000);
+
+    rca.shutdown(AdaptorShutdownPolicy.HARD_STOP);
+    
+  }
+
+  volatile int nextExpectedLine = 0;
+  
+  @Override
+  public void add(Chunk event) throws InterruptedException {
+//    System.out.println("got a chunk; len = " + event.getData().length);
+    String[] lines = new String(event.getData()).split("\n");
+    System.err.println("got chunk; " + lines.length + " lines " + event.getData().length + " bytes");
+    for(String line: lines) {
+      String n = line.substring(line.indexOf(':')+1);
+      int i = Integer.parseInt(n);
+//      System.out.println("saw "+i);
+      if(i != nextExpectedLine) {
+        System.err.println("lines out of order: saw " + i + " expected " + nextExpectedLine);
+        System.exit(0);
+        fail();
+      }
+      nextExpectedLine = i+1;
+    
+    }
+  }
 
 }