You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@chukwa.apache.org by as...@apache.org on 2009/12/21 22:13:28 UTC
svn commit: r892979 - in /hadoop/chukwa/trunk/src:
java/org/apache/hadoop/chukwa/datacollection/adaptor/
java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/
test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/
Author: asrabkin
Date: Mon Dec 21 21:13:15 2009
New Revision: 892979
URL: http://svn.apache.org/viewvc?rev=892979&view=rev
Log:
wrong version of CHUKWA-421
Modified:
hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/AbstractWrapper.java
hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/MemBuffered.java
hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/NotifyOnCommitAdaptor.java
hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailer.java
hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java
hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/LWFTAdaptor.java
hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/RCheckFTAdaptor.java
hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TerminatorThread.java
hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestRCheckAdaptor.java
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/AbstractWrapper.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/AbstractWrapper.java?rev=892979&r1=892978&r2=892979&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/AbstractWrapper.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/AbstractWrapper.java Mon Dec 21 21:13:15 2009
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.hadoop.chukwa.datacollection.adaptor;
import java.util.*;
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/MemBuffered.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/MemBuffered.java?rev=892979&r1=892978&r2=892979&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/MemBuffered.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/MemBuffered.java Mon Dec 21 21:13:15 2009
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.hadoop.chukwa.datacollection.adaptor;
import java.util.*;
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/NotifyOnCommitAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/NotifyOnCommitAdaptor.java?rev=892979&r1=892978&r2=892979&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/NotifyOnCommitAdaptor.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/NotifyOnCommitAdaptor.java Mon Dec 21 21:13:15 2009
@@ -1,6 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.hadoop.chukwa.datacollection.adaptor;
-
public interface NotifyOnCommitAdaptor extends Adaptor {
abstract void committed(long commitedByte);
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailer.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailer.java?rev=892979&r1=892978&r2=892979&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailer.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailer.java Mon Dec 21 21:13:15 2009
@@ -41,7 +41,7 @@
private List<LWFTAdaptor> adaptors;
private volatile boolean isRunning = true;
- ChunkQueue eq; // not private -- useful for file tailing adaptor classes
+// ChunkQueue eq; // not private -- useful for file tailing adaptor classes
/**
* How often to tail each file.
@@ -56,7 +56,7 @@
SAMPLE_PERIOD_MS = conf.getInt(
"chukwaAgent.adaptor.context.switch.time",
DEFAULT_SAMPLE_PERIOD_MS);
- eq = DataFactory.getInstance().getEventQueue();
+// eq = DataFactory.getInstance().getEventQueue();
// iterations are much more common than adding a new adaptor
adaptors = new CopyOnWriteArrayList<LWFTAdaptor>();
@@ -81,7 +81,7 @@
boolean shouldISleep = true;
long startTime = System.currentTimeMillis();
for (LWFTAdaptor f : adaptors) {
- boolean hasMoreData = f.tailFile(eq);
+ boolean hasMoreData = f.tailFile();
shouldISleep &= !hasMoreData;
}
long timeToReadFiles = System.currentTimeMillis() - startTime;
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java?rev=892979&r1=892978&r2=892979&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java Mon Dec 21 21:13:15 2009
@@ -100,7 +100,7 @@
if (toWatch.exists()) {
int retry = 0;
tailer.stopWatchingFile(this);
- TerminatorThread lastTail = new TerminatorThread(this, tailer.eq);
+ TerminatorThread lastTail = new TerminatorThread(this);
lastTail.setDaemon(true);
lastTail.start();
@@ -129,7 +129,7 @@
}
break;
}
- log.info("Exist Shutdown:" + shutdownPolicy.name()+ " - ObjectId:" + this);
+ log.info("Exit Shutdown:" + shutdownPolicy.name()+ " - ObjectId:" + this);
return fileReadOffset + offsetOfFirstByte;
}
@@ -141,7 +141,8 @@
*
* @param eq the queue to write Chunks into
*/
- public synchronized boolean tailFile(ChunkReceiver eq)
+ @Override
+ public synchronized boolean tailFile()
throws InterruptedException {
boolean hasMoreData = false;
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/LWFTAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/LWFTAdaptor.java?rev=892979&r1=892978&r2=892979&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/LWFTAdaptor.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/LWFTAdaptor.java Mon Dec 21 21:13:15 2009
@@ -206,7 +206,7 @@
return hasMoreData;
}
- public synchronized boolean tailFile(ChunkReceiver eq)
+ public synchronized boolean tailFile()
throws InterruptedException {
boolean hasMoreData = false;
try {
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/RCheckFTAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/RCheckFTAdaptor.java?rev=892979&r1=892978&r2=892979&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/RCheckFTAdaptor.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/RCheckFTAdaptor.java Mon Dec 21 21:13:15 2009
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.hadoop.chukwa.datacollection.adaptor.filetailer;
import java.io.File;
@@ -35,8 +52,9 @@
return -1;
else if (mod > o.mod)
return 1;
- //want toWatch to be last
- else return (o.f.getName().compareTo(f.getName()));//shouldn't happen?
+ //want toWatch to be last after a rotation; otherwise, this is basically
+ //just a heuristic that hasn't been tuned yet
+ else return (o.f.getName().compareTo(f.getName()));
}
}
@@ -81,7 +99,7 @@
File toWatchDir = toWatch.getParentFile();
File[] candidates = toWatchDir.listFiles(this);
if(candidates == null) {
- log.error(toWatchDir + " is not a directory");
+ log.error(toWatchDir + " is not a directory in "+adaptorID);
} else {
log.debug("saw " + candidates.length + " files matching pattern");
fileQ = new LinkedList<FPair>();
@@ -113,7 +131,7 @@
}
@Override
- public synchronized boolean tailFile(ChunkReceiver eq)
+ public synchronized boolean tailFile()
throws InterruptedException {
boolean hasMoreData = false;
try {
@@ -126,10 +144,13 @@
if(cur == null) //file we're watching doesn't exist
return false;
- log.debug("treating " + cur + " as " + toWatch);
long len = cur.length();
long tsPreTail = cur.exists() ? cur.lastModified() : prevFileLastModDate;
+
+ if(log.isDebugEnabled())
+ log.debug(adaptorID + " treating " + cur + " as " + toWatch + " len = " + len);
+
if(len < fileReadOffset) {
log.info("file "+ cur +" shrank from " + fileReadOffset + " to " + len);
//no unseen changes to prev version, since mod date is older than last scan.
@@ -155,10 +176,15 @@
}
} catch(IOException e) {
- log.warn("IOException in tailer", e);
+ log.warn("IOException in "+adaptorID, e);
deregisterAndStop(false);
}
return hasMoreData;
}
+
+
+ public String toString() {
+ return "Rotation-aware Tailer on " + toWatch;
+ }
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TerminatorThread.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TerminatorThread.java?rev=892979&r1=892978&r2=892979&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TerminatorThread.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TerminatorThread.java Mon Dec 21 21:13:15 2009
@@ -26,11 +26,9 @@
private static Logger log = Logger.getLogger(TerminatorThread.class);
private FileTailingAdaptor adaptor = null;
- private ChunkReceiver eq = null;
- public TerminatorThread(FileTailingAdaptor adaptor, ChunkReceiver eq) {
+ public TerminatorThread(FileTailingAdaptor adaptor) {
this.adaptor = adaptor;
- this.eq = eq;
}
public void run() {
@@ -40,7 +38,7 @@
int count = 0;
log.info("Terminator thread started." + adaptor.toWatch.getPath());
try {
- while (adaptor.tailFile(eq)) {
+ while (adaptor.tailFile()) {
if (log.isDebugEnabled()) {
log.debug("Terminator thread:" + adaptor.toWatch.getPath()
+ " still working");
Modified: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestRCheckAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestRCheckAdaptor.java?rev=892979&r1=892978&r2=892979&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestRCheckAdaptor.java (original)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestRCheckAdaptor.java Mon Dec 21 21:13:15 2009
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.hadoop.chukwa.datacollection.adaptor.filetailer;
import java.io.File;
@@ -5,14 +22,17 @@
import java.io.IOException;
import java.io.PrintWriter;
import org.apache.hadoop.chukwa.Chunk;
+import org.apache.hadoop.chukwa.datacollection.ChunkReceiver;
+import org.apache.hadoop.chukwa.datacollection.agent.AdaptorManager;
import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
import org.apache.hadoop.chukwa.datacollection.connector.ChunkCatcherConnector;
import org.apache.hadoop.conf.Configuration;
import junit.framework.TestCase;
+import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorShutdownPolicy;
import org.apache.hadoop.chukwa.datacollection.adaptor.TestDirTailingAdaptor;
import org.apache.log4j.Level;
-public class TestRCheckAdaptor extends TestCase {
+public class TestRCheckAdaptor extends TestCase implements ChunkReceiver {
ChunkCatcherConnector chunks;
@@ -21,7 +41,7 @@
chunks.start();
}
- public void testLogRotate() throws IOException, InterruptedException,
+ public void testBaseCases() throws IOException, InterruptedException,
ChukwaAgent.AlreadyRunningException {
Configuration conf = new Configuration();
conf.set("chukwaAgent.control.port", "0");
@@ -89,5 +109,59 @@
agent.shutdown();
Thread.sleep(2000);
}
+
+
+ public void testContinuously() throws Exception {
+ File baseDir = new File(System.getProperty("test.build.data", "/tmp") + "/rcheck");
+ TestDirTailingAdaptor.createEmptyDir(baseDir);
+ File tmpOutput = new File(baseDir, "continuousTest");
+ PrintWriter pw = new PrintWriter(new FileOutputStream(tmpOutput, true));
+ LWFTAdaptor.tailer.SAMPLE_PERIOD_MS = 2000;
+
+// RCheckFTAdaptor.log.setLevel(Level.DEBUG);
+ RCheckFTAdaptor rca = new RCheckFTAdaptor();
+ rca.parseArgs("Test", tmpOutput.getAbsolutePath(), AdaptorManager.NULL);
+ rca.start("id", "Test", 0, this);
+
+
+ Thread.sleep(1000);
+ for(int i= 0; i < 200; ++i) {
+ Thread.sleep(120);
+ pw.println("This is line:" + i);
+ if( i % 5 == 0)
+ pw.flush();
+ if(i % 20 == 0) {
+ System.err.println("rotating");
+ pw.close();
+ tmpOutput.renameTo( new File(baseDir, "continuousTest."+(i/10)));
+ pw = new PrintWriter(new FileOutputStream(tmpOutput, true));
+ }
+ }
+ Thread.sleep(1000);
+
+ rca.shutdown(AdaptorShutdownPolicy.HARD_STOP);
+
+ }
+
+ volatile int nextExpectedLine = 0;
+
+ @Override
+ public void add(Chunk event) throws InterruptedException {
+// System.out.println("got a chunk; len = " + event.getData().length);
+ String[] lines = new String(event.getData()).split("\n");
+ System.err.println("got chunk; " + lines.length + " lines " + event.getData().length + " bytes");
+ for(String line: lines) {
+ String n = line.substring(line.indexOf(':')+1);
+ int i = Integer.parseInt(n);
+// System.out.println("saw "+i);
+ if(i != nextExpectedLine) {
+ System.err.println("lines out of order: saw " + i + " expected " + nextExpectedLine);
+ System.exit(0);
+ fail();
+ }
+ nextExpectedLine = i+1;
+
+ }
+ }
}