You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@chukwa.apache.org by as...@apache.org on 2009/10/05 09:11:21 UTC

svn commit: r821702 - in /hadoop/chukwa/trunk: CHANGES.txt contrib/chukwa-pig/src/java/org/apache/hadoop/chukwa/ChukwaArchive.java contrib/chukwa-pig/test/src/java/org/apache/hadoop/chukwa/TestArchiveReader.java

Author: asrabkin
Date: Mon Oct  5 07:11:20 2009
New Revision: 821702

URL: http://svn.apache.org/viewvc?rev=821702&view=rev
Log:
CHUKWA-393. Support using pig on Chunks.

Added:
    hadoop/chukwa/trunk/contrib/chukwa-pig/src/java/org/apache/hadoop/chukwa/ChukwaArchive.java
    hadoop/chukwa/trunk/contrib/chukwa-pig/test/src/java/org/apache/hadoop/chukwa/TestArchiveReader.java
Modified:
    hadoop/chukwa/trunk/CHANGES.txt

Modified: hadoop/chukwa/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/CHANGES.txt?rev=821702&r1=821701&r2=821702&view=diff
==============================================================================
--- hadoop/chukwa/trunk/CHANGES.txt (original)
+++ hadoop/chukwa/trunk/CHANGES.txt Mon Oct  5 07:11:20 2009
@@ -58,6 +58,8 @@
 
   IMPROVEMENTS
 
+    CHUKWA-393. Support using pig on Chunks. (asrabkin)
+
     CHUKWA-392.  FIFO queueing of threads in collector. (asrabkin)
 
     CHUKWA-388.  Clean up user interface color.  (Eric Yang)

Added: hadoop/chukwa/trunk/contrib/chukwa-pig/src/java/org/apache/hadoop/chukwa/ChukwaArchive.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/contrib/chukwa-pig/src/java/org/apache/hadoop/chukwa/ChukwaArchive.java?rev=821702&view=auto
==============================================================================
--- hadoop/chukwa/trunk/contrib/chukwa-pig/src/java/org/apache/hadoop/chukwa/ChukwaArchive.java (added)
+++ hadoop/chukwa/trunk/contrib/chukwa-pig/src/java/org/apache/hadoop/chukwa/ChukwaArchive.java Mon Oct  5 07:11:20 2009
@@ -0,0 +1,92 @@
+package org.apache.hadoop.chukwa;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.pig.ExecType;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.StoreFunc;
+import org.apache.pig.backend.datastorage.DataStorage;
+import org.apache.pig.builtin.Utf8StorageConverter;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.DefaultTupleFactory;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.io.BufferedPositionedInputStream;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import static org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
+
+public class ChukwaArchive extends Utf8StorageConverter implements LoadFunc {
+
+  private SequenceFile.Reader r = null;
+  private long end = -1;
+
+  private TupleFactory tf = DefaultTupleFactory.getInstance();
+  
+  @Override
+  public void bindTo(String name, BufferedPositionedInputStream arg1,
+      long offset, long end) throws IOException {
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.get(conf);
+    Path path = new Path(name);
+    r = new SequenceFile.Reader(fs, path, conf);
+    if(offset > 0)
+      r.sync(offset);
+    this.end = end;
+    
+//    System.out.println("bound to " + name + " at " + offset);
+  }
+  
+
+  @Override
+  public void fieldsToRead(Schema arg0) {
+    //we don't need this; no-op
+  }
+
+  
+  static Schema chukwaArchiveSchema;
+  static int schemaFieldCount;
+  static {
+    chukwaArchiveSchema = new Schema();
+    chukwaArchiveSchema.add(new FieldSchema("seqNo", DataType.LONG));
+    chukwaArchiveSchema.add(new FieldSchema("type", DataType.CHARARRAY));
+    chukwaArchiveSchema.add(new FieldSchema("name", DataType.CHARARRAY));
+    chukwaArchiveSchema.add(new FieldSchema("source", DataType.CHARARRAY));
+    chukwaArchiveSchema.add(new FieldSchema("tags", DataType.CHARARRAY));
+    chukwaArchiveSchema.add(new FieldSchema("data", DataType.BYTEARRAY));
+    schemaFieldCount = chukwaArchiveSchema.size();
+    //do we want to expose the record offsets?
+  }
+
+  @Override
+  public Schema determineSchema(String arg0, ExecType arg1, DataStorage arg2)
+      throws IOException {
+    return chukwaArchiveSchema;
+  }
+
+  @Override
+  public Tuple getNext() throws IOException {
+    
+    ChukwaArchiveKey key = new ChukwaArchiveKey();
+    ChunkImpl val = ChunkImpl.getBlankChunk();
+    if(r.getPosition() > end || !r.next(key, val)) {
+      return null;
+    }
+    Tuple t = tf.newTuple(schemaFieldCount);
+    t.set(0, new Long(val.seqID));
+    t.set(1, val.getDataType());
+    t.set(2, val.getStreamName());
+    t.set(3, val.getSource());
+    t.set(4, val.getTags());
+    t.set(5, val.getData());
+    
+//    System.out.println("returning " + t);
+    return t;
+  }
+
+
+}

Added: hadoop/chukwa/trunk/contrib/chukwa-pig/test/src/java/org/apache/hadoop/chukwa/TestArchiveReader.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/contrib/chukwa-pig/test/src/java/org/apache/hadoop/chukwa/TestArchiveReader.java?rev=821702&view=auto
==============================================================================
--- hadoop/chukwa/trunk/contrib/chukwa-pig/test/src/java/org/apache/hadoop/chukwa/TestArchiveReader.java (added)
+++ hadoop/chukwa/trunk/contrib/chukwa-pig/test/src/java/org/apache/hadoop/chukwa/TestArchiveReader.java Mon Oct  5 07:11:20 2009
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Iterator;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
+import org.apache.hadoop.chukwa.util.GenerateTestFile;
+import org.apache.hadoop.chukwa.util.TempFileUtil;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.pig.*;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.schema.*;
+/**
+ * Note that this test will NOT work if run from eclipse.
+ * 
+ * Pig needs a jarfile, and therefore the test makes fairly strong
+ * assumptions about its environment.  It'll work correctly 
+ * if you do ant test. 
+ *
+ */
+public class TestArchiveReader extends PigTest {
+
+  protected ExecType getExecType() {
+    return ExecType.LOCAL;
+  }
+
+  public void testLocal() {
+
+    File tempDir = new File(System.getProperty("test.build.data", "/tmp"));
+    if (!tempDir.exists()) {
+      tempDir.mkdirs();
+    }
+    String directory = tempDir.getAbsolutePath() + "/TestArchiveChukwaStorage_"
+        + System.currentTimeMillis() + "/";
+    System.out.println(directory);
+    FileSystem fs = null;
+    Configuration conf = null;
+    
+    try {
+      conf = new Configuration();
+      fs = FileSystem.getLocal(conf);
+      Path seqFile = new Path(directory, "test.seq");
+      TempFileUtil.writeASinkFile(conf, fs, seqFile, 10);
+
+     File buildDir = new File(System.getProperty("chukwa.root.build.dir", "../../build/"));
+//     File buildDir = new File(System.getProperty("chukwa.root.build.dir", 
+ //            "/Users/asrabkin/workspace/chukwa_trunk/build"));
+
+      String[] files = buildDir.list();
+      for (String f : files) {
+        if (f.startsWith("chukwa-core") && f.endsWith(".jar")) {
+          log.info("Found" + buildDir.getAbsolutePath() + "/" + f);
+          pigServer.registerJar(buildDir.getAbsolutePath() + "/" + f);
+          break;
+        }
+      }
+      String pigJarDir = System.getProperty("chukwa-pig.build.dir", "../../build/");
+ //     pigJarDir = "/Users/asrabkin/workspace/chukwa_trunk/contrib/chukwa-pig";
+      pigServer.registerJar(pigJarDir + "/chukwa-pig.jar");
+      
+      pigServer.registerQuery("A = load '" + seqFile.toString()
+            + "' using  org.apache.hadoop.chukwa.ChukwaArchive()"
+    //       +" as (ts: long,fields);");
+            + ";");
+     // pigServer.registerQuery("B = FOREACH A GENERATE ts,'myCluster',fields,fields#'csource','myRecord',fields#'csource','myApplication', fields#'A';");
+    //  pigServer.registerQuery("define seqWriter org.apache.hadoop.chukwa.ChukwaStorage('c_timestamp', 'c_cluster' ,'fields','c_pk','c_recordtype','c_source','c_application','myFieldA');");
+    //  pigServer.registerQuery("STORE B into '" + directory
+   //       + "/chukwa-pig.evt' using seqWriter;");
+
+      Schema schema_A = pigServer.dumpSchema("A");
+      assertTrue(schema_A.equals(ChukwaArchive.chukwaArchiveSchema));
+ //     pigServer.explain("A", System.out);
+ 
+//      pigServer.registerQuery("B = DUMP A");
+      pigServer.registerQuery("B = FOREACH A GENERATE seqNo;");
+     
+      Iterator<Tuple> chunks = pigServer.openIterator("B");
+      if(!chunks.hasNext())
+        System.out.println("WARN: I expected to get some seqNos");
+      while(chunks.hasNext()) {
+        System.out.println(chunks.next());
+      }
+      
+    } catch (Exception e) {
+      e.printStackTrace();
+      Assert.fail();
+    } finally {
+      if (fs != null) {
+        try {
+          fs.delete(new Path(directory), true);
+        } catch (IOException e) {
+          e.printStackTrace();
+        }
+      }
+      pigServer.shutdown();
+    }
+  }
+
+  protected String dumpArchive(FileSystem fs, Configuration conf, String file)
+      throws Throwable {
+    SequenceFile.Reader reader = null;
+    log.info("File: [" +  file + "]" + fs.exists(new Path(file)));
+    try {
+      reader = new SequenceFile.Reader(fs, new Path(file), conf);
+
+      ChukwaRecordKey key = new ChukwaRecordKey();
+      ChukwaRecord record = new ChukwaRecord();
+
+      StringBuilder sb = new StringBuilder();
+      while (reader.next(key, record)) {
+       
+        sb.append("===== KEY   =====");
+
+        sb.append("DataType: " + key.getReduceType());
+        sb.append("Key: " + key.getKey());
+        sb.append("===== Value =====");
+
+        String[] fields = record.getFields();
+        Arrays.sort(fields );
+        sb.append("Timestamp : " + record.getTime());
+        for (String field : fields) {
+          sb.append("[" + field + "] :" + record.getValue(field));
+        }
+      }
+      
+      return sb.toString();
+    } catch (Throwable e) {
+      e.printStackTrace();
+      Assert.fail("Exception while reading SeqFile" + e.getMessage());
+      throw e;
+    }
+    finally {
+      if (reader != null) {
+        reader.close();
+      }
+    }
+  }
+}