You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@chukwa.apache.org by ey...@apache.org on 2013/07/07 04:19:19 UTC

svn commit: r1500353 - in /incubator/chukwa/trunk: ./ src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/ src/site/apt/ src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/

Author: eyang
Date: Sun Jul  7 02:19:19 2013
New Revision: 1500353

URL: http://svn.apache.org/r1500353
Log:
CHUKWA-686. Added FileTailingAdaptorPreserveLines adaptor. (Sourygna Luangsay via Eric Yang)

Added:
    incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptorPreserveLines.java
    incubator/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptorPreserveLines.java
Modified:
    incubator/chukwa/trunk/CHANGES.txt
    incubator/chukwa/trunk/src/site/apt/agent.apt

Modified: incubator/chukwa/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/CHANGES.txt?rev=1500353&r1=1500352&r2=1500353&view=diff
==============================================================================
--- incubator/chukwa/trunk/CHANGES.txt (original)
+++ incubator/chukwa/trunk/CHANGES.txt Sun Jul  7 02:19:19 2013
@@ -14,6 +14,8 @@ Trunk (unreleased changes)
 
   IMPROVEMENTS
 
+    CHUKWA-686. Added FileTailingAdaptorPreserveLines adaptor. (Sourygna Luangsay via Eric Yang)
+
     CHUKWA-648. Make Chukwa Reduce Type to support hierarchy format. (Jie Huang via asrabkin)
 
     CHUKWA-650. Re-configure Demux ReduceNumber without re-starting the DemuxManager service. (Jie Huang via asrabkin)

Added: incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptorPreserveLines.java
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptorPreserveLines.java?rev=1500353&view=auto
==============================================================================
--- incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptorPreserveLines.java (added)
+++ incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptorPreserveLines.java Sun Jul  7 02:19:19 2013
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.datacollection.adaptor.filetailer;
+
+import java.util.Arrays;
+
+import org.apache.hadoop.chukwa.ChunkImpl;
+import org.apache.hadoop.chukwa.datacollection.ChunkReceiver;
+
+/**
+ * A subclass of FileTailingAdaptor that reads UTF8/ascii files and only send
+ * chunks with complete lines.
+ */
+public class FileTailingAdaptorPreserveLines extends FileTailingAdaptor {
+
+  private static final char SEPARATOR = '\n';
+
+  @Override
+  protected int extractRecords(ChunkReceiver eq, long buffOffsetInFile,
+      byte[] buf) throws InterruptedException {
+    int lastNewLineOffset = 0;
+    for (int i = buf.length - 1; i >= 0; --i) {
+      if (buf[i] == SEPARATOR) {
+        lastNewLineOffset = i;
+        break;
+      }
+    }
+
+    if (lastNewLineOffset > 0) {
+      int[] offsets_i = { lastNewLineOffset };
+
+      int bytesUsed = lastNewLineOffset + 1; // char at last
+                                             // offset uses a byte
+      assert bytesUsed > 0 : " shouldn't send empty events";
+      ChunkImpl event = new ChunkImpl(type, toWatch.getAbsolutePath(),
+          buffOffsetInFile + bytesUsed, Arrays.copyOf(buf, bytesUsed), this);
+
+      event.setRecordOffsets(offsets_i);
+      eq.add(event);
+
+      return bytesUsed;
+    } else
+      return 0;
+  }
+}

Modified: incubator/chukwa/trunk/src/site/apt/agent.apt
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/site/apt/agent.apt?rev=1500353&r1=1500352&r2=1500353&view=diff
==============================================================================
--- incubator/chukwa/trunk/src/site/apt/agent.apt (original)
+++ incubator/chukwa/trunk/src/site/apt/agent.apt Sun Jul  7 02:19:19 2013
@@ -144,6 +144,13 @@ add filetailer.FileTailingAdaptor BarDat
      Chukwa-formatted log files, where exception
      stack traces stay in a single chunk.
 
+  * <<filetailer.FileTailingAdaptorPreserveLines>>
+	Similar to CharFileTailingAdaptorUTF8. The difference with the latter is
+	mainly seen in the Demux Chukwa process: CharFileTailingAdaptorUTF8 will process
+	every line one by one whereas FileTailingAdaptorPreserveLines will process
+	all the lines of a same Chunk in a same go which makes the Demux jobs faster.
+	Same parameters and usage as the above.
+
   * <<DirTailingAdaptor>> Takes a directory path and an
     adaptor name as mandatory parameters; repeatedly scans that directory 
     and all subdirectories, and starts the indicated adaptor running on 

Added: incubator/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptorPreserveLines.java
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptorPreserveLines.java?rev=1500353&view=auto
==============================================================================
--- incubator/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptorPreserveLines.java (added)
+++ incubator/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptorPreserveLines.java Sun Jul  7 02:19:19 2013
@@ -0,0 +1,134 @@
+package org.apache.hadoop.chukwa.datacollection.adaptor.filetailer;
+
+import static org.apache.hadoop.chukwa.util.TempFileUtil.makeTestFile;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.UnsupportedEncodingException;
+
+import org.apache.hadoop.chukwa.Chunk;
+import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
+import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
+import org.apache.hadoop.chukwa.datacollection.connector.ChunkCatcherConnector;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestFileTailingAdaptorPreserveLines {
+  private static File testFile;
+  // private static String adaptorToTest = "FileTailingAdaptor";
+  // private static String adaptorToTest = "CharFileTailingAdaptorUTF8";
+  private static String adaptorToTest = "FileTailingAdaptorPreserveLines";
+  private static ChukwaConfiguration conf;
+  private ChukwaAgent agent;
+  private String adaptorId;
+  private ChunkCatcherConnector chunks;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    File baseDir = new File(System.getProperty("test.build.data", "/tmp"));
+    testFile = makeTestFile("TestFileTailingAdaptorPreserveLines", 10,
+        baseDir);
+
+    conf = new ChukwaConfiguration();
+    conf.setInt("chukwaAgent.fileTailingAdaptor.maxReadSize", 130);
+  }
+
+  /**
+   * @throws Exception
+   */
+  @Before
+  public void setUp() throws Exception {
+    agent = new ChukwaAgent(conf);
+    chunks = new ChunkCatcherConnector();
+    chunks.start();
+
+    adaptorId = agent.processAddCommand("add adaptor_test =" + "filetailer."
+        + adaptorToTest + " TestFileTailingAdaptorPreserveLines "
+        + testFile.getCanonicalPath() + " 0");
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    agent.stopAdaptor(adaptorId, false);
+    agent.shutdown();
+    chunks.clear();
+    chunks.shutdown();
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    if (testFile != null) {
+      testFile.delete();
+    }
+  }
+
+  /**
+   * Check that chunk does not break lines (which is the problem of
+   * FileTailingAdaptor adaptor)
+   * 
+   * @throws UnsupportedEncodingException
+   */
+  @Test
+  public void testDontBreakLines() throws UnsupportedEncodingException {
+    Chunk c = chunks.waitForAChunk(5000);
+    String data = new String(c.getData(), "UTF-8");
+
+    String[] lines = data.split("\\r?\\n");
+
+    // Check that length of the last line is the same as the
+    // one of the first line. Otherwise, it means the last
+    // line has been cut
+    assertEquals(lines[0].length(), lines[lines.length - 1].length());
+  }
+
+  /**
+   * Check that second chunk contains the data that just follow the first
+   * chunk's data
+   * 
+   * @throws UnsupportedEncodingException
+   */
+  @Test
+  public void testSecondChunkDataFollowsFirstChunkData()
+      throws UnsupportedEncodingException {
+    Chunk c = chunks.waitForAChunk(5000);
+    String data = new String(c.getData(), "UTF-8");
+    String[] lines1 = data.split("\\r?\\n");
+
+    c = chunks.waitForAChunk(5000);
+    data = new String(c.getData(), "UTF-8");
+    String[] lines2 = data.split("\\r?\\n");
+
+    int numLastLineChunk1 = (int) (lines1[lines1.length - 1].charAt(0));
+    int numLastLineChunk2 = (int) (lines2[0].charAt(0));
+
+    // Check that lines numbers are successive between
+    // last line of first chunk and first line of second chunk
+    assertEquals(numLastLineChunk1, numLastLineChunk2 - 1);
+  }
+
+  /**
+   * Check that chunk only has one set record offset although it has more than 2
+   * lines (which is the contrary of CharFileTailingAdaptorUTF8)
+   * 
+   * @throws UnsupportedEncodingException
+   */
+  @Test
+  public void testOnlyOneSetRecordOffset()
+      throws UnsupportedEncodingException {
+    Chunk c = chunks.waitForAChunk(5000);
+    String data = new String(c.getData(), "UTF-8");
+    String[] lines = data.split("\\r?\\n");
+
+    // Check that we have more than two lines
+    assertTrue(lines.length > 2);
+
+    int[] offsets_i = c.getRecordOffsets();
+
+    // Check that we only have one offset
+    assertEquals(1, offsets_i.length);
+  }
+}