You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by rd...@apache.org on 2010/01/21 19:19:49 UTC

svn commit: r901811 - /hadoop/pig/branches/load-store-redesign/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/JsonMetadata.java

Author: rding
Date: Thu Jan 21 18:19:49 2010
New Revision: 901811

URL: http://svn.apache.org/viewvc?rev=901811&view=rev
Log:
PIG-1090: Update sources to reflect recent changes in load-store interfaces

Added:
    hadoop/pig/branches/load-store-redesign/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/JsonMetadata.java

Added: hadoop/pig/branches/load-store-redesign/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/JsonMetadata.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/JsonMetadata.java?rev=901811&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/JsonMetadata.java (added)
+++ hadoop/pig/branches/load-store-redesign/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/JsonMetadata.java Thu Jan 21 18:19:49 2010
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.piggybank.storage;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.pig.Expression;
+import org.apache.pig.LoadMetadata;
+import org.apache.pig.StoreMetadata;
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.ResourceStatistics;
+import org.apache.pig.backend.datastorage.ContainerDescriptor;
+import org.apache.pig.backend.datastorage.DataStorage;
+import org.apache.pig.backend.datastorage.ElementDescriptor;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.backend.hadoop.datastorage.HDataStorage;
+import org.apache.pig.backend.hadoop.datastorage.HDirectory;
+import org.apache.pig.backend.hadoop.datastorage.HFile;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.codehaus.jackson.JsonGenerationException;
+import org.codehaus.jackson.JsonParseException;
+import org.codehaus.jackson.map.JsonMappingException;
+import org.codehaus.jackson.map.ObjectMapper;
+
+/**
+ * Reads and Writes metadata using JSON in metafiles next to the data.
+ * 
+ *
+ */
+public class JsonMetadata implements LoadMetadata, StoreMetadata {
+
+    private static final Log log = LogFactory.getLog(JsonMetadata.class);
+
+    // These are not static+final because we may want to make these adjustable by users.
+    private String schemaFileName = ".pig_schema";
+    private String headerFileName = ".pig_header";
+    private String statFileName = ".pig_stats";
+    
+    private boolean printHeaders = true;
+
+    private byte fieldDel;
+    private byte recordDel;
+
+    public JsonMetadata() {
+
+    }
+
+    /**.
+     * Given a path, which may represent a glob pattern, a directory, or a file, this method
+     * finds the set of relevant metadata files on the storage system. The algorithm for finding the
+     * metadata file is as follows:
+     * <p>
+     * For each file represented by the path (either directly, or via a glob):
+     *   If parentPath/prefix.fileName exists, use that as the metadata file.
+     *   Else if parentPath/prefix exists, use that as the metadata file.
+     * <p>
+     * Resolving conflicts, merging the metadata, etc, is not handled by this method and should be
+     * taken care of by downstream code.
+     * 
+     * This can go into a util package if metadata files are considered a general enough pattern
+     * 
+     * @param path      Path, as passed in to a LoadFunc (may be a Hadoop glob)
+     * @param prefix    Metadata file designation, such as .pig_schema or .pig_stats
+     * @param conf      configuration object
+     * @return Set of element descriptors for all metadata files associated with the files on the path.
+     */
+    protected Set<ElementDescriptor> findMetaFile(String path, String prefix, Configuration conf) 
+        throws IOException {
+        DataStorage storage = new HDataStorage(ConfigurationUtil.toProperties(conf));
+        String fullPath = FileLocalizer.fullPath(path, storage);
+        Set<ElementDescriptor> metaFileSet = new HashSet<ElementDescriptor>();
+        if(storage.isContainer(fullPath)) {
+            ElementDescriptor metaFilePath = storage.asElement(fullPath, prefix);
+            if (metaFilePath.exists()) {
+                metaFileSet.add(metaFilePath); 
+            }
+        } else {
+            ElementDescriptor[] descriptors = storage.asCollection(path);
+            for(ElementDescriptor descriptor : descriptors) {
+                String fileName = null, parentName = null;
+                ContainerDescriptor parentContainer = null;
+                if (descriptor instanceof HFile) {
+                    Path descriptorPath = ((HFile) descriptor).getPath();
+                    fileName = descriptorPath.getName();
+                    Path parent = descriptorPath.getParent();
+                    parentName = parent.toString();
+                    parentContainer = new HDirectory((HDataStorage)storage,parent);
+                }
+                ElementDescriptor metaFilePath = storage.asElement(parentName, prefix+"."+fileName);
+
+                // if the file has a custom schema, use it
+                if (metaFilePath.exists()) {
+                    metaFileSet.add(metaFilePath);
+                    continue;
+                }
+
+                // if no custom schema, try the parent directory
+                metaFilePath = storage.asElement(parentContainer, prefix);
+                if (metaFilePath.exists()) {
+                    metaFileSet.add(metaFilePath);
+                }
+            }
+        }
+        return metaFileSet;
+    }
+    
+    //------------------------------------------------------------------------
+    // Implementation of LoadMetaData interface
+    
+    @Override
+    public String[] getPartitionKeys(String location, Configuration conf) {
+        return null;
+    }
+
+    @Override
+    public void setPartitionFilter(Expression partitionFilter)
+            throws IOException {        
+    }
+    
+    /**
+     * For JsonMetadata schema is considered optional
+     * This method suppresses (and logs) errors if they are encountered.
+     * 
+     * TODO location and conf params are ignored in favor of initialzation data 
+     */
+    @Override
+    public ResourceSchema getSchema(String location, Configuration conf) throws IOException {     
+        Set<ElementDescriptor> schemaFileSet = null;
+        try {
+            schemaFileSet = findMetaFile(location, schemaFileName, conf);
+        } catch (IOException e) {
+            log.warn("Could not find schema file for "+ location); 
+            return null;
+        }
+              
+        // TODO we assume that all schemas are the same. The question of merging schemas is left open for now. 
+        ElementDescriptor schemaFile = null;
+        if (!schemaFileSet.isEmpty()) {            
+            schemaFile = schemaFileSet.iterator().next();
+        } else {
+            log.warn("Could not find schema file for "+location); 
+            return null;
+        }
+        log.info("Found schema file: "+schemaFile.toString());
+        ResourceSchema resourceSchema = null;
+        try {
+            resourceSchema = new ObjectMapper().readValue(schemaFile.open(), ResourceSchema.class);
+        } catch (JsonParseException e) {
+            log.warn("Unable to load Resource Schema for "+location);
+            e.printStackTrace();
+        } catch (JsonMappingException e) {
+            log.warn("Unable to load Resource Schema for "+location);
+            e.printStackTrace();
+        } catch (IOException e) {
+            log.warn("Unable to load Resource Schema for "+location);
+            e.printStackTrace();
+        }
+
+        return resourceSchema;
+    }
+
+    /**
+     * For JsonMetadata stats are considered optional
+     * This method suppresses (and logs) errors if they are encountered.
+     * @see org.apache.pig.LoadMetadata#getStatistics(String, Configuration)
+     */
+    @Override
+    public ResourceStatistics getStatistics(String location, Configuration conf) throws IOException {
+        Set<ElementDescriptor> statFileSet = null;
+        try {
+            statFileSet = findMetaFile(location, statFileName, conf);
+        } catch (IOException e) {
+            log.warn("could not fine stat file for "+location);
+            return null;
+        }
+        ElementDescriptor statFile = null;
+        if (!statFileSet.isEmpty()) {
+            statFile = statFileSet.iterator().next();
+        } else {
+            log.warn("Could not find stat file for "+location);
+            return null;
+        }
+        log.info("Found stat file "+statFile.toString());
+        ResourceStatistics resourceStats = null;        
+        try {
+            resourceStats = new ObjectMapper().readValue(statFile.open(), ResourceStatistics.class);
+        } catch (JsonParseException e) {
+            log.warn("Unable to load Resource Statistics for "+location);
+            e.printStackTrace();
+        } catch (JsonMappingException e) {
+            log.warn("Unable to load Resource Statistics for "+location);
+            e.printStackTrace();
+        } catch (IOException e) {
+            log.warn("Unable to load Resource Statistics for "+location);
+            e.printStackTrace();
+        }
+        return resourceStats;
+    }
+
+    //------------------------------------------------------------------------
+    // Implementation of StoreMetaData interface
+    
+    @Override
+    public void storeStatistics(ResourceStatistics stats, String location, Configuration conf) throws IOException {
+        DataStorage storage = new HDataStorage(ConfigurationUtil.toProperties(conf));
+        ElementDescriptor statFilePath = storage.asElement(location, statFileName);
+        if(!statFilePath.exists() && stats != null) {
+            try {
+                new ObjectMapper().writeValue(statFilePath.create(), stats);                    
+            } catch (JsonGenerationException e) {
+                log.warn("Unable to write Resource Statistics for "+location);
+                e.printStackTrace();
+            } catch (JsonMappingException e) {
+                log.warn("Unable to write Resource Statistics for "+location);
+                e.printStackTrace();
+            }
+        }
+    }
+
+    @Override
+    public void storeSchema(ResourceSchema schema, String location, Configuration conf) throws IOException {
+        DataStorage storage = new HDataStorage(ConfigurationUtil.toProperties(conf));
+        ElementDescriptor schemaFilePath = storage.asElement(location, schemaFileName);
+        if(!schemaFilePath.exists() && schema != null) {
+            try {
+                new ObjectMapper().writeValue(schemaFilePath.create(), schema);
+            } catch (JsonGenerationException e) {
+                log.warn("Unable to write Resource Statistics for "+location);
+                e.printStackTrace();
+            } catch (JsonMappingException e) {
+                log.warn("Unable to write Resource Statistics for "+location);
+                e.printStackTrace();
+            }
+        }
+        if (printHeaders) {
+            ElementDescriptor headerFilePath = storage.asElement(location, headerFileName);
+            if (!headerFilePath.exists()) {
+                OutputStream os = headerFilePath.create();
+                try {
+                    String[] names = schema.fieldNames();
+
+                    for (int i=0; i < names.length; i++) {
+                        os.write(names[i].getBytes("UTF-8"));
+                        if (i <names.length-1) {
+                            os.write(fieldDel);
+                        } else {
+                            os.write(recordDel);
+                        }
+                    }
+                } finally {
+                    os.close();
+                }
+            }
+        }
+    }
+
+    public void setFieldDel(byte fieldDel) {
+        this.fieldDel = fieldDel;
+    }
+
+    public void setRecordDel(byte recordDel) {
+        this.recordDel = recordDel;
+    }
+
+}