You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@chukwa.apache.org by ey...@apache.org on 2013/07/07 04:19:19 UTC
svn commit: r1500353 - in /incubator/chukwa/trunk: ./
src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/
src/site/apt/
src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/
Author: eyang
Date: Sun Jul 7 02:19:19 2013
New Revision: 1500353
URL: http://svn.apache.org/r1500353
Log:
CHUKWA-686. Added FileTailingAdaptorPreserveLines adaptor. (Sourygna Luangsay via Eric Yang)
Added:
incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptorPreserveLines.java
incubator/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptorPreserveLines.java
Modified:
incubator/chukwa/trunk/CHANGES.txt
incubator/chukwa/trunk/src/site/apt/agent.apt
Modified: incubator/chukwa/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/CHANGES.txt?rev=1500353&r1=1500352&r2=1500353&view=diff
==============================================================================
--- incubator/chukwa/trunk/CHANGES.txt (original)
+++ incubator/chukwa/trunk/CHANGES.txt Sun Jul 7 02:19:19 2013
@@ -14,6 +14,8 @@ Trunk (unreleased changes)
IMPROVEMENTS
+ CHUKWA-686. Added FileTailingAdaptorPreserveLines adaptor. (Sourygna Luangsay via Eric Yang)
+
CHUKWA-648. Make Chukwa Reduce Type to support hierarchy format. (Jie Huang via asrabkin)
CHUKWA-650. Re-configure Demux ReduceNumber without re-starting the DemuxManager service. (Jie Huang via asrabkin)
Added: incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptorPreserveLines.java
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptorPreserveLines.java?rev=1500353&view=auto
==============================================================================
--- incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptorPreserveLines.java (added)
+++ incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptorPreserveLines.java Sun Jul 7 02:19:19 2013
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.datacollection.adaptor.filetailer;
+
+import java.util.Arrays;
+
+import org.apache.hadoop.chukwa.ChunkImpl;
+import org.apache.hadoop.chukwa.datacollection.ChunkReceiver;
+
+/**
+ * A subclass of FileTailingAdaptor that reads UTF8/ascii files and only send
+ * chunks with complete lines.
+ */
+public class FileTailingAdaptorPreserveLines extends FileTailingAdaptor {
+
+ private static final char SEPARATOR = '\n';
+
+ @Override
+ protected int extractRecords(ChunkReceiver eq, long buffOffsetInFile,
+ byte[] buf) throws InterruptedException {
+ int lastNewLineOffset = 0;
+ for (int i = buf.length - 1; i >= 0; --i) {
+ if (buf[i] == SEPARATOR) {
+ lastNewLineOffset = i;
+ break;
+ }
+ }
+
+ if (lastNewLineOffset > 0) {
+ int[] offsets_i = { lastNewLineOffset };
+
+ int bytesUsed = lastNewLineOffset + 1; // char at last
+ // offset uses a byte
+ assert bytesUsed > 0 : " shouldn't send empty events";
+ ChunkImpl event = new ChunkImpl(type, toWatch.getAbsolutePath(),
+ buffOffsetInFile + bytesUsed, Arrays.copyOf(buf, bytesUsed), this);
+
+ event.setRecordOffsets(offsets_i);
+ eq.add(event);
+
+ return bytesUsed;
+ } else
+ return 0;
+ }
+}
Modified: incubator/chukwa/trunk/src/site/apt/agent.apt
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/site/apt/agent.apt?rev=1500353&r1=1500352&r2=1500353&view=diff
==============================================================================
--- incubator/chukwa/trunk/src/site/apt/agent.apt (original)
+++ incubator/chukwa/trunk/src/site/apt/agent.apt Sun Jul 7 02:19:19 2013
@@ -144,6 +144,13 @@ add filetailer.FileTailingAdaptor BarDat
Chukwa-formatted log files, where exception
stack traces stay in a single chunk.
+ * <<filetailer.FileTailingAdaptorPreserveLines>>
+ Similar to CharFileTailingAdaptorUTF8. The difference with the latter is
+ mainly seen in the Demux Chukwa process: CharFileTailingAdaptorUTF8 will process
+ every line one by one whereas FileTailingAdaptorPreserveLines will process
+ all the lines of a same Chunk in a same go which makes the Demux jobs faster.
+ Same parameters and usage as the above.
+
* <<DirTailingAdaptor>> Takes a directory path and an
adaptor name as mandatory parameters; repeatedly scans that directory
and all subdirectories, and starts the indicated adaptor running on
Added: incubator/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptorPreserveLines.java
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptorPreserveLines.java?rev=1500353&view=auto
==============================================================================
--- incubator/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptorPreserveLines.java (added)
+++ incubator/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptorPreserveLines.java Sun Jul 7 02:19:19 2013
@@ -0,0 +1,134 @@
+package org.apache.hadoop.chukwa.datacollection.adaptor.filetailer;
+
+import static org.apache.hadoop.chukwa.util.TempFileUtil.makeTestFile;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.UnsupportedEncodingException;
+
+import org.apache.hadoop.chukwa.Chunk;
+import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
+import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
+import org.apache.hadoop.chukwa.datacollection.connector.ChunkCatcherConnector;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestFileTailingAdaptorPreserveLines {
+ private static File testFile;
+ // private static String adaptorToTest = "FileTailingAdaptor";
+ // private static String adaptorToTest = "CharFileTailingAdaptorUTF8";
+ private static String adaptorToTest = "FileTailingAdaptorPreserveLines";
+ private static ChukwaConfiguration conf;
+ private ChukwaAgent agent;
+ private String adaptorId;
+ private ChunkCatcherConnector chunks;
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ File baseDir = new File(System.getProperty("test.build.data", "/tmp"));
+ testFile = makeTestFile("TestFileTailingAdaptorPreserveLines", 10,
+ baseDir);
+
+ conf = new ChukwaConfiguration();
+ conf.setInt("chukwaAgent.fileTailingAdaptor.maxReadSize", 130);
+ }
+
+ /**
+ * @throws Exception
+ */
+ @Before
+ public void setUp() throws Exception {
+ agent = new ChukwaAgent(conf);
+ chunks = new ChunkCatcherConnector();
+ chunks.start();
+
+ adaptorId = agent.processAddCommand("add adaptor_test =" + "filetailer."
+ + adaptorToTest + " TestFileTailingAdaptorPreserveLines "
+ + testFile.getCanonicalPath() + " 0");
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ agent.stopAdaptor(adaptorId, false);
+ agent.shutdown();
+ chunks.clear();
+ chunks.shutdown();
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ if (testFile != null) {
+ testFile.delete();
+ }
+ }
+
+ /**
+ * Check that chunk does not break lines (which is the problem of
+ * FileTailingAdaptor adaptor)
+ *
+ * @throws UnsupportedEncodingException
+ */
+ @Test
+ public void testDontBreakLines() throws UnsupportedEncodingException {
+ Chunk c = chunks.waitForAChunk(5000);
+ String data = new String(c.getData(), "UTF-8");
+
+ String[] lines = data.split("\\r?\\n");
+
+ // Check that length of the last line is the same as the
+ // one of the first line. Otherwise, it means the last
+ // line has been cut
+ assertEquals(lines[0].length(), lines[lines.length - 1].length());
+ }
+
+ /**
+ * Check that second chunk contains the data that just follow the first
+ * chunk's data
+ *
+ * @throws UnsupportedEncodingException
+ */
+ @Test
+ public void testSecondChunkDataFollowsFirstChunkData()
+ throws UnsupportedEncodingException {
+ Chunk c = chunks.waitForAChunk(5000);
+ String data = new String(c.getData(), "UTF-8");
+ String[] lines1 = data.split("\\r?\\n");
+
+ c = chunks.waitForAChunk(5000);
+ data = new String(c.getData(), "UTF-8");
+ String[] lines2 = data.split("\\r?\\n");
+
+ int numLastLineChunk1 = (int) (lines1[lines1.length - 1].charAt(0));
+ int numLastLineChunk2 = (int) (lines2[0].charAt(0));
+
+ // Check that lines numbers are successive between
+ // last line of first chunk and first line of second chunk
+ assertEquals(numLastLineChunk1, numLastLineChunk2 - 1);
+ }
+
+ /**
+ * Check that chunk only has one set record offset although it has more than 2
+ * lines (which is the contrary of CharFileTailingAdaptorUTF8)
+ *
+ * @throws UnsupportedEncodingException
+ */
+ @Test
+ public void testOnlyOneSetRecordOffset()
+ throws UnsupportedEncodingException {
+ Chunk c = chunks.waitForAChunk(5000);
+ String data = new String(c.getData(), "UTF-8");
+ String[] lines = data.split("\\r?\\n");
+
+ // Check that we have more than two lines
+ assertTrue(lines.length > 2);
+
+ int[] offsets_i = c.getRecordOffsets();
+
+ // Check that we only have one offset
+ assertEquals(1, offsets_i.length);
+ }
+}