You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@hawq.apache.org by sansanichfb <gi...@git.apache.org> on 2017/12/29 02:39:20 UTC

[GitHub] incubator-hawq pull request #1326: HAWQ-1575. Implemented readable Parquet p...

GitHub user sansanichfb opened a pull request:

    https://github.com/apache/incubator-hawq/pull/1326

    HAWQ-1575. Implemented readable Parquet profile for PXF.

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/sansanichfb/incubator-hawq HAWQ-1575

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-hawq/pull/1326.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1326
    
----
commit f03bbb062b0e1d522dbae6d30d84cd0364f2af1c
Author: Alex Diachenko <od...@...>
Date:   2017-11-30T22:14:26Z

    HAWQ-1575. Implemented readable Parquet profile for PXF.

----


---

[GitHub] incubator-hawq pull request #1326: HAWQ-1575. Implemented readable Parquet p...

Posted by sansanichfb <gi...@git.apache.org>.
Github user sansanichfb commented on a diff in the pull request:

    https://github.com/apache/incubator-hawq/pull/1326#discussion_r165799526
  
    --- Diff: pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/ParquetDataFragmenter.java ---
    @@ -0,0 +1,103 @@
    +package org.apache.hawq.pxf.plugins.hdfs;
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * 
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + * 
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.mapreduce.InputSplit;
    +import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    +import org.apache.hadoop.mapred.JobConf;
    +import org.apache.hadoop.mapreduce.Job;
    +import org.apache.hawq.pxf.api.Fragment;
    +import org.apache.hawq.pxf.api.Fragmenter;
    +import org.apache.hawq.pxf.api.utilities.InputData;
    +import org.apache.hawq.pxf.plugins.hdfs.utilities.HdfsUtilities;
    +import org.apache.parquet.format.converter.ParquetMetadataConverter;
    +import org.apache.parquet.hadoop.ParquetFileReader;
    +import org.apache.parquet.hadoop.ParquetInputFormat;
    +import org.apache.parquet.example.data.Group;
    +import org.apache.parquet.hadoop.metadata.ParquetMetadata;
    +import org.apache.parquet.schema.MessageType;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +public class ParquetDataFragmenter extends Fragmenter {
    +    private Job job;
    +
    +    public ParquetDataFragmenter(InputData md) {
    +        super(md);
    +        JobConf jobConf = new JobConf(new Configuration(), ParquetDataFragmenter.class);
    +        try {
    +            job = Job.getInstance(jobConf);
    +        } catch (IOException e) {
    +            throw new RuntimeException("Unable to instantiate a job for reading fragments", e);
    +        }
    +    }
    +
    +
    +    @Override
    +    public List<Fragment> getFragments() throws Exception {
    +        String absoluteDataPath = HdfsUtilities.absoluteDataPath(inputData.getDataSource());
    +        ArrayList<InputSplit> splits = getSplits(new Path(absoluteDataPath));
    +
    +        for (InputSplit split : splits) {
    +            FileSplit fsp = (FileSplit) split;
    +
    +            String filepath = fsp.getPath().toUri().getPath();
    +            String[] hosts = fsp.getLocations();
    +
    +            Path file = new Path(filepath);
    +
    +            ParquetMetadata metadata = ParquetFileReader.readFooter(
    +                    job.getConfiguration(), file, ParquetMetadataConverter.NO_FILTER);
    +            MessageType schema = metadata.getFileMetaData().getSchema();
    +
    +            byte[] fragmentMetadata = HdfsUtilities.prepareFragmentMetadata(fsp.getStart(), fsp.getLength(), fsp.getLocations());
    --- End diff --
    
    This method is needed to support `org.apache.hadoop.mapreduce.lib.input.FileSplit` type.


---

[GitHub] incubator-hawq pull request #1326: HAWQ-1575. Implemented readable Parquet p...

Posted by sansanichfb <gi...@git.apache.org>.
Github user sansanichfb commented on a diff in the pull request:

    https://github.com/apache/incubator-hawq/pull/1326#discussion_r166175619
  
    --- Diff: pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/ParquetFileAccessor.java ---
    @@ -0,0 +1,168 @@
    +package org.apache.hawq.pxf.plugins.hdfs;
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * 
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + * 
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.mapred.FileSplit;
    +import org.apache.hawq.pxf.api.OneRow;
    +import org.apache.hawq.pxf.api.ReadAccessor;
    +import org.apache.hawq.pxf.api.utilities.InputData;
    +import org.apache.hawq.pxf.api.utilities.Plugin;
    +import org.apache.hawq.pxf.plugins.hdfs.utilities.HdfsUtilities;
    +
    +import org.apache.parquet.column.page.PageReadStore;
    +import org.apache.parquet.example.data.Group;
    +import org.apache.parquet.example.data.simple.convert.GroupRecordConverter;
    +import org.apache.parquet.format.converter.ParquetMetadataConverter;
    +import org.apache.parquet.hadoop.ParquetFileReader;
    +import org.apache.parquet.io.ColumnIOFactory;
    +import org.apache.parquet.io.MessageColumnIO;
    +import org.apache.parquet.io.RecordReader;
    +import org.apache.parquet.schema.MessageType;
    +
    +import java.io.IOException;
    +import java.util.Iterator;
    +
    +/**
    + * Parquet file accessor.
    --- End diff --
    
    Added 


---

[GitHub] incubator-hawq pull request #1326: HAWQ-1575. Implemented readable Parquet p...

Posted by shivzone <gi...@git.apache.org>.
Github user shivzone commented on a diff in the pull request:

    https://github.com/apache/incubator-hawq/pull/1326#discussion_r159297466
  
    --- Diff: pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/ParquetDataFragmenter.java ---
    @@ -0,0 +1,103 @@
    +package org.apache.hawq.pxf.plugins.hdfs;
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * 
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + * 
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.mapreduce.InputSplit;
    +import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    +import org.apache.hadoop.mapred.JobConf;
    +import org.apache.hadoop.mapreduce.Job;
    +import org.apache.hawq.pxf.api.Fragment;
    +import org.apache.hawq.pxf.api.Fragmenter;
    +import org.apache.hawq.pxf.api.utilities.InputData;
    +import org.apache.hawq.pxf.plugins.hdfs.utilities.HdfsUtilities;
    +import org.apache.parquet.format.converter.ParquetMetadataConverter;
    +import org.apache.parquet.hadoop.ParquetFileReader;
    +import org.apache.parquet.hadoop.ParquetInputFormat;
    +import org.apache.parquet.example.data.Group;
    +import org.apache.parquet.hadoop.metadata.ParquetMetadata;
    +import org.apache.parquet.schema.MessageType;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +public class ParquetDataFragmenter extends Fragmenter {
    +    private Job job;
    +
    +    public ParquetDataFragmenter(InputData md) {
    +        super(md);
    +        JobConf jobConf = new JobConf(new Configuration(), ParquetDataFragmenter.class);
    +        try {
    +            job = Job.getInstance(jobConf);
    +        } catch (IOException e) {
    +            throw new RuntimeException("Unable to instantiate a job for reading fragments", e);
    +        }
    +    }
    +
    +
    +    @Override
    +    public List<Fragment> getFragments() throws Exception {
    +        String absoluteDataPath = HdfsUtilities.absoluteDataPath(inputData.getDataSource());
    +        ArrayList<InputSplit> splits = getSplits(new Path(absoluteDataPath));
    +
    +        for (InputSplit split : splits) {
    +            FileSplit fsp = (FileSplit) split;
    +
    +            String filepath = fsp.getPath().toUri().getPath();
    +            String[] hosts = fsp.getLocations();
    +
    +            Path file = new Path(filepath);
    +
    +            ParquetMetadata metadata = ParquetFileReader.readFooter(
    +                    job.getConfiguration(), file, ParquetMetadataConverter.NO_FILTER);
    +            MessageType schema = metadata.getFileMetaData().getSchema();
    +
    +            byte[] fragmentMetadata = HdfsUtilities.prepareFragmentMetadata(fsp.getStart(), fsp.getLength(), fsp.getLocations());
    --- End diff --
    
    Can we simply use prepareFragmentMetadata(fsp)


---

[GitHub] incubator-hawq pull request #1326: HAWQ-1575. Implemented readable Parquet p...

Posted by shivzone <gi...@git.apache.org>.
Github user shivzone commented on a diff in the pull request:

    https://github.com/apache/incubator-hawq/pull/1326#discussion_r159298493
  
    --- Diff: pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/ParquetFileAccessor.java ---
    @@ -0,0 +1,168 @@
    +package org.apache.hawq.pxf.plugins.hdfs;
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * 
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + * 
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.mapred.FileSplit;
    +import org.apache.hawq.pxf.api.OneRow;
    +import org.apache.hawq.pxf.api.ReadAccessor;
    +import org.apache.hawq.pxf.api.utilities.InputData;
    +import org.apache.hawq.pxf.api.utilities.Plugin;
    +import org.apache.hawq.pxf.plugins.hdfs.utilities.HdfsUtilities;
    +
    +import org.apache.parquet.column.page.PageReadStore;
    +import org.apache.parquet.example.data.Group;
    +import org.apache.parquet.example.data.simple.convert.GroupRecordConverter;
    +import org.apache.parquet.format.converter.ParquetMetadataConverter;
    +import org.apache.parquet.hadoop.ParquetFileReader;
    +import org.apache.parquet.io.ColumnIOFactory;
    +import org.apache.parquet.io.MessageColumnIO;
    +import org.apache.parquet.io.RecordReader;
    +import org.apache.parquet.schema.MessageType;
    +
    +import java.io.IOException;
    +import java.util.Iterator;
    +
    +/**
    + * Parquet file accessor.
    --- End diff --
    
    Also mention what exactly the accessor call returns (record or chunk etc)


---

[GitHub] incubator-hawq pull request #1326: HAWQ-1575. Implemented readable Parquet p...

Posted by denalex <gi...@git.apache.org>.
Github user denalex commented on a diff in the pull request:

    https://github.com/apache/incubator-hawq/pull/1326#discussion_r159306278
  
    --- Diff: pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/ParquetDataFragmenter.java ---
    @@ -0,0 +1,103 @@
    +package org.apache.hawq.pxf.plugins.hdfs;
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * 
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + * 
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.mapreduce.InputSplit;
    +import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    +import org.apache.hadoop.mapred.JobConf;
    +import org.apache.hadoop.mapreduce.Job;
    +import org.apache.hawq.pxf.api.Fragment;
    +import org.apache.hawq.pxf.api.Fragmenter;
    +import org.apache.hawq.pxf.api.utilities.InputData;
    +import org.apache.hawq.pxf.plugins.hdfs.utilities.HdfsUtilities;
    +import org.apache.parquet.format.converter.ParquetMetadataConverter;
    +import org.apache.parquet.hadoop.ParquetFileReader;
    +import org.apache.parquet.hadoop.ParquetInputFormat;
    +import org.apache.parquet.example.data.Group;
    +import org.apache.parquet.hadoop.metadata.ParquetMetadata;
    +import org.apache.parquet.schema.MessageType;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +public class ParquetDataFragmenter extends Fragmenter {
    +    private Job job;
    +
    +    public ParquetDataFragmenter(InputData md) {
    +        super(md);
    +        JobConf jobConf = new JobConf(new Configuration(), ParquetDataFragmenter.class);
    +        try {
    +            job = Job.getInstance(jobConf);
    +        } catch (IOException e) {
    +            throw new RuntimeException("Unable to instantiate a job for reading fragments", e);
    +        }
    +    }
    +
    +
    +    @Override
    +    public List<Fragment> getFragments() throws Exception {
    +        String absoluteDataPath = HdfsUtilities.absoluteDataPath(inputData.getDataSource());
    +        ArrayList<InputSplit> splits = getSplits(new Path(absoluteDataPath));
    --- End diff --
    
    usually best to declare type as List, especially since there is no direct access calls that would justify narrowing this to ArrayList


---

[GitHub] incubator-hawq pull request #1326: HAWQ-1575. Implemented readable Parquet p...

Posted by lisakowen <gi...@git.apache.org>.
Github user lisakowen commented on a diff in the pull request:

    https://github.com/apache/incubator-hawq/pull/1326#discussion_r159073951
  
    --- Diff: pxf/pxf-service/src/main/resources/pxf-profiles-default.xml ---
    @@ -184,4 +184,13 @@ under the License.
                 <resolver>org.apache.hawq.pxf.plugins.jdbc.JdbcReadResolver</resolver>
             </plugins>
         </profile>
    +    <profile>
    +        <name>Parquet</name>
    --- End diff --
    
    if someone could create another connector/profile that would access parquet files from a different external data store, we should consider qualifying the profile name, i.e. HdfsParquet.  (along the same lines, perhaps we should qualify the profiles named Avro and Json.)


---

[GitHub] incubator-hawq pull request #1326: HAWQ-1575. Implemented readable Parquet p...

Posted by sansanichfb <gi...@git.apache.org>.
Github user sansanichfb commented on a diff in the pull request:

    https://github.com/apache/incubator-hawq/pull/1326#discussion_r165782793
  
    --- Diff: pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/ParquetDataFragmenter.java ---
    @@ -0,0 +1,103 @@
    +package org.apache.hawq.pxf.plugins.hdfs;
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * 
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + * 
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.mapreduce.InputSplit;
    +import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    +import org.apache.hadoop.mapred.JobConf;
    +import org.apache.hadoop.mapreduce.Job;
    +import org.apache.hawq.pxf.api.Fragment;
    +import org.apache.hawq.pxf.api.Fragmenter;
    +import org.apache.hawq.pxf.api.utilities.InputData;
    +import org.apache.hawq.pxf.plugins.hdfs.utilities.HdfsUtilities;
    +import org.apache.parquet.format.converter.ParquetMetadataConverter;
    +import org.apache.parquet.hadoop.ParquetFileReader;
    +import org.apache.parquet.hadoop.ParquetInputFormat;
    +import org.apache.parquet.example.data.Group;
    +import org.apache.parquet.hadoop.metadata.ParquetMetadata;
    +import org.apache.parquet.schema.MessageType;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +public class ParquetDataFragmenter extends Fragmenter {
    +    private Job job;
    +
    +    public ParquetDataFragmenter(InputData md) {
    +        super(md);
    +        JobConf jobConf = new JobConf(new Configuration(), ParquetDataFragmenter.class);
    +        try {
    +            job = Job.getInstance(jobConf);
    +        } catch (IOException e) {
    +            throw new RuntimeException("Unable to instantiate a job for reading fragments", e);
    +        }
    +    }
    +
    +
    +    @Override
    +    public List<Fragment> getFragments() throws Exception {
    +        String absoluteDataPath = HdfsUtilities.absoluteDataPath(inputData.getDataSource());
    +        ArrayList<InputSplit> splits = getSplits(new Path(absoluteDataPath));
    --- End diff --
    
    Thanks, updated


---

[GitHub] incubator-hawq pull request #1326: HAWQ-1575. Implemented readable Parquet p...

Posted by sansanichfb <gi...@git.apache.org>.
Github user sansanichfb commented on a diff in the pull request:

    https://github.com/apache/incubator-hawq/pull/1326#discussion_r165783402
  
    --- Diff: pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/utilities/HdfsUtilities.java ---
    @@ -151,18 +153,42 @@ public static boolean isThreadSafe(String dataDir, String compCodec) {
          * @param fsp file split to be serialized
          * @return byte serialization of fsp
          * @throws IOException if I/O errors occur while writing to the underlying
    -     *             stream
    +     *                     stream
          */
         public static byte[] prepareFragmentMetadata(FileSplit fsp)
                 throws IOException {
    -        ByteArrayOutputStream byteArrayStream = new ByteArrayOutputStream();
    -        ObjectOutputStream objectStream = new ObjectOutputStream(
    -                byteArrayStream);
    -        objectStream.writeLong(fsp.getStart());
    -        objectStream.writeLong(fsp.getLength());
    -        objectStream.writeObject(fsp.getLocations());
    +
    +        return prepareFragmentMetadata(fsp.getStart(), fsp.getLength(), fsp.getLocations());
    +
    +    }
    +
    +    public static byte[] prepareFragmentMetadata(long start, long length, String[] locations)
    +            throws IOException {
    +
    +        ByteArrayOutputStream byteArrayStream = writeBaseFragmentInfo(start, length, locations);
     
             return byteArrayStream.toByteArray();
    +
    +    }
    +
    +    private static ByteArrayOutputStream writeBaseFragmentInfo(long start, long length, String[] locations) throws IOException {
    +        ByteArrayOutputStream byteArrayStream = new ByteArrayOutputStream();
    +        ObjectOutputStream objectStream = new ObjectOutputStream(byteArrayStream);
    +        objectStream.writeLong(start);
    +        objectStream.writeLong(length);
    +        objectStream.writeObject(locations);
    +        return byteArrayStream;
    +    }
    +
    +    public static byte[] prepareFragmentMetadata(long start,
    --- End diff --
    
    Thanks, deleted as unnecessary 


---

[GitHub] incubator-hawq pull request #1326: HAWQ-1575. Implemented readable Parquet p...

Posted by sansanichfb <gi...@git.apache.org>.
Github user sansanichfb closed the pull request at:

    https://github.com/apache/incubator-hawq/pull/1326


---

[GitHub] incubator-hawq pull request #1326: HAWQ-1575. Implemented readable Parquet p...

Posted by shivzone <gi...@git.apache.org>.
Github user shivzone commented on a diff in the pull request:

    https://github.com/apache/incubator-hawq/pull/1326#discussion_r159298191
  
    --- Diff: pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/ParquetDataFragmenter.java ---
    @@ -0,0 +1,103 @@
    +package org.apache.hawq.pxf.plugins.hdfs;
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * 
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + * 
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.mapreduce.InputSplit;
    +import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    +import org.apache.hadoop.mapred.JobConf;
    +import org.apache.hadoop.mapreduce.Job;
    +import org.apache.hawq.pxf.api.Fragment;
    +import org.apache.hawq.pxf.api.Fragmenter;
    +import org.apache.hawq.pxf.api.utilities.InputData;
    +import org.apache.hawq.pxf.plugins.hdfs.utilities.HdfsUtilities;
    +import org.apache.parquet.format.converter.ParquetMetadataConverter;
    +import org.apache.parquet.hadoop.ParquetFileReader;
    +import org.apache.parquet.hadoop.ParquetInputFormat;
    +import org.apache.parquet.example.data.Group;
    +import org.apache.parquet.hadoop.metadata.ParquetMetadata;
    +import org.apache.parquet.schema.MessageType;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +public class ParquetDataFragmenter extends Fragmenter {
    +    private Job job;
    +
    +    public ParquetDataFragmenter(InputData md) {
    +        super(md);
    +        JobConf jobConf = new JobConf(new Configuration(), ParquetDataFragmenter.class);
    +        try {
    +            job = Job.getInstance(jobConf);
    +        } catch (IOException e) {
    +            throw new RuntimeException("Unable to instantiate a job for reading fragments", e);
    +        }
    +    }
    +
    +
    +    @Override
    --- End diff --
    
    Comments would be useful describing the components of the Fragment data here


---

[GitHub] incubator-hawq pull request #1326: HAWQ-1575. Implemented readable Parquet p...

Posted by shivzone <gi...@git.apache.org>.
Github user shivzone commented on a diff in the pull request:

    https://github.com/apache/incubator-hawq/pull/1326#discussion_r159297714
  
    --- Diff: pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/utilities/HdfsUtilities.java ---
    @@ -151,18 +153,42 @@ public static boolean isThreadSafe(String dataDir, String compCodec) {
          * @param fsp file split to be serialized
          * @return byte serialization of fsp
          * @throws IOException if I/O errors occur while writing to the underlying
    -     *             stream
    +     *                     stream
          */
         public static byte[] prepareFragmentMetadata(FileSplit fsp)
                 throws IOException {
    -        ByteArrayOutputStream byteArrayStream = new ByteArrayOutputStream();
    -        ObjectOutputStream objectStream = new ObjectOutputStream(
    -                byteArrayStream);
    -        objectStream.writeLong(fsp.getStart());
    -        objectStream.writeLong(fsp.getLength());
    -        objectStream.writeObject(fsp.getLocations());
    +
    +        return prepareFragmentMetadata(fsp.getStart(), fsp.getLength(), fsp.getLocations());
    +
    +    }
    +
    +    public static byte[] prepareFragmentMetadata(long start, long length, String[] locations)
    --- End diff --
    
    This can be made private 


---

[GitHub] incubator-hawq pull request #1326: HAWQ-1575. Implemented readable Parquet p...

Posted by denalex <gi...@git.apache.org>.
Github user denalex commented on a diff in the pull request:

    https://github.com/apache/incubator-hawq/pull/1326#discussion_r159308210
  
    --- Diff: pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/utilities/HdfsUtilities.java ---
    @@ -151,18 +153,42 @@ public static boolean isThreadSafe(String dataDir, String compCodec) {
          * @param fsp file split to be serialized
          * @return byte serialization of fsp
          * @throws IOException if I/O errors occur while writing to the underlying
    -     *             stream
    +     *                     stream
          */
         public static byte[] prepareFragmentMetadata(FileSplit fsp)
                 throws IOException {
    -        ByteArrayOutputStream byteArrayStream = new ByteArrayOutputStream();
    -        ObjectOutputStream objectStream = new ObjectOutputStream(
    -                byteArrayStream);
    -        objectStream.writeLong(fsp.getStart());
    -        objectStream.writeLong(fsp.getLength());
    -        objectStream.writeObject(fsp.getLocations());
    +
    +        return prepareFragmentMetadata(fsp.getStart(), fsp.getLength(), fsp.getLocations());
    +
    +    }
    +
    +    public static byte[] prepareFragmentMetadata(long start, long length, String[] locations)
    --- End diff --
    
    or better to incorporate 2 lines from this function into the parent function, if it only is used once.


---

[GitHub] incubator-hawq pull request #1326: HAWQ-1575. Implemented readable Parquet p...

Posted by shivzone <gi...@git.apache.org>.
Github user shivzone commented on a diff in the pull request:

    https://github.com/apache/incubator-hawq/pull/1326#discussion_r159297386
  
    --- Diff: pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/utilities/HdfsUtilities.java ---
    @@ -151,18 +153,42 @@ public static boolean isThreadSafe(String dataDir, String compCodec) {
          * @param fsp file split to be serialized
          * @return byte serialization of fsp
          * @throws IOException if I/O errors occur while writing to the underlying
    -     *             stream
    +     *                     stream
          */
         public static byte[] prepareFragmentMetadata(FileSplit fsp)
                 throws IOException {
    -        ByteArrayOutputStream byteArrayStream = new ByteArrayOutputStream();
    -        ObjectOutputStream objectStream = new ObjectOutputStream(
    -                byteArrayStream);
    -        objectStream.writeLong(fsp.getStart());
    -        objectStream.writeLong(fsp.getLength());
    -        objectStream.writeObject(fsp.getLocations());
    +
    +        return prepareFragmentMetadata(fsp.getStart(), fsp.getLength(), fsp.getLocations());
    +
    +    }
    +
    +    public static byte[] prepareFragmentMetadata(long start, long length, String[] locations)
    +            throws IOException {
    +
    +        ByteArrayOutputStream byteArrayStream = writeBaseFragmentInfo(start, length, locations);
     
             return byteArrayStream.toByteArray();
    +
    +    }
    +
    +    private static ByteArrayOutputStream writeBaseFragmentInfo(long start, long length, String[] locations) throws IOException {
    +        ByteArrayOutputStream byteArrayStream = new ByteArrayOutputStream();
    +        ObjectOutputStream objectStream = new ObjectOutputStream(byteArrayStream);
    +        objectStream.writeLong(start);
    +        objectStream.writeLong(length);
    +        objectStream.writeObject(locations);
    +        return byteArrayStream;
    +    }
    +
    +    public static byte[] prepareFragmentMetadata(long start,
    --- End diff --
    
    I do'nt see this function used anywhere ?


---

[GitHub] incubator-hawq pull request #1326: HAWQ-1575. Implemented readable Parquet p...

Posted by sansanichfb <gi...@git.apache.org>.
Github user sansanichfb commented on a diff in the pull request:

    https://github.com/apache/incubator-hawq/pull/1326#discussion_r166173757
  
    --- Diff: pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/ParquetFileAccessor.java ---
    @@ -0,0 +1,168 @@
    +package org.apache.hawq.pxf.plugins.hdfs;
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * 
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + * 
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.mapred.FileSplit;
    +import org.apache.hawq.pxf.api.OneRow;
    +import org.apache.hawq.pxf.api.ReadAccessor;
    +import org.apache.hawq.pxf.api.utilities.InputData;
    +import org.apache.hawq.pxf.api.utilities.Plugin;
    +import org.apache.hawq.pxf.plugins.hdfs.utilities.HdfsUtilities;
    +
    +import org.apache.parquet.column.page.PageReadStore;
    +import org.apache.parquet.example.data.Group;
    +import org.apache.parquet.example.data.simple.convert.GroupRecordConverter;
    +import org.apache.parquet.format.converter.ParquetMetadataConverter;
    +import org.apache.parquet.hadoop.ParquetFileReader;
    +import org.apache.parquet.io.ColumnIOFactory;
    +import org.apache.parquet.io.MessageColumnIO;
    +import org.apache.parquet.io.RecordReader;
    +import org.apache.parquet.schema.MessageType;
    +
    +import java.io.IOException;
    +import java.util.Iterator;
    +
    +/**
    + * Parquet file accessor.
    + */
    +public class ParquetFileAccessor extends Plugin implements ReadAccessor {
    +    private ParquetFileReader reader;
    +    private MessageColumnIO columnIO;
    +    private RecordIterator recordIterator;
    +    private MessageType schema;
    +
    +
    +    private class RecordIterator implements Iterator<OneRow> {
    +
    +        private final ParquetFileReader reader;
    +        private PageReadStore currentRowGroup;
    +        private RecordReader<Group> recordReader;
    +        private long rowsRemainedInRowGroup;
    +
    +        public RecordIterator(ParquetFileReader reader) {
    +            this.reader = reader;
    +            readNextRowGroup();
    +        }
    +
    +        @Override
    +        public boolean hasNext() {
    +            return rowsRemainedInRowGroup > 0;
    +        }
    +
    +        @Override
    +        public OneRow next() {
    +            return new OneRow(null, readNextGroup());
    +        }
    +
    +        @Override
    +        public void remove() {
    +            throw new UnsupportedOperationException();
    +        }
    +
    +        private void readNextRowGroup() {
    +            try {
    +                currentRowGroup = reader.readNextRowGroup();
    +            } catch (IOException e) {
    +                throw new RuntimeException("Error occurred during reading new row group", e);
    +            }
    +            if (currentRowGroup == null)
    +                return;
    +            rowsRemainedInRowGroup = currentRowGroup.getRowCount();
    +            recordReader = columnIO.getRecordReader(currentRowGroup, new GroupRecordConverter(schema));
    +        }
    +
    +        private Group readNextGroup() {
    +            Group g = null;
    +            if (rowsRemainedInRowGroup == 0) {
    +                readNextRowGroup();
    +                if (currentRowGroup != null) {
    +                    g = recordReader.read();
    +                }
    +            } else {
    +                g = recordReader.read();
    +                if (g == null) {
    --- End diff --
    
    Even though could looks slightly more complex - we are saving on invoking that method for every single record.


---

[GitHub] incubator-hawq pull request #1326: HAWQ-1575. Implemented readable Parquet p...

Posted by sansanichfb <gi...@git.apache.org>.
Github user sansanichfb commented on a diff in the pull request:

    https://github.com/apache/incubator-hawq/pull/1326#discussion_r166173537
  
    --- Diff: pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/utilities/HdfsUtilities.java ---
    @@ -151,18 +153,42 @@ public static boolean isThreadSafe(String dataDir, String compCodec) {
          * @param fsp file split to be serialized
          * @return byte serialization of fsp
          * @throws IOException if I/O errors occur while writing to the underlying
    -     *             stream
    +     *                     stream
          */
         public static byte[] prepareFragmentMetadata(FileSplit fsp)
                 throws IOException {
    -        ByteArrayOutputStream byteArrayStream = new ByteArrayOutputStream();
    -        ObjectOutputStream objectStream = new ObjectOutputStream(
    -                byteArrayStream);
    -        objectStream.writeLong(fsp.getStart());
    -        objectStream.writeLong(fsp.getLength());
    -        objectStream.writeObject(fsp.getLocations());
    +
    +        return prepareFragmentMetadata(fsp.getStart(), fsp.getLength(), fsp.getLocations());
    +
    +    }
    +
    +    public static byte[] prepareFragmentMetadata(long start, long length, String[] locations)
    --- End diff --
    
    Both functions are used so I would rather keep them both for sake of compatibility.


---

[GitHub] incubator-hawq pull request #1326: HAWQ-1575. Implemented readable Parquet p...

Posted by denalex <gi...@git.apache.org>.
Github user denalex commented on a diff in the pull request:

    https://github.com/apache/incubator-hawq/pull/1326#discussion_r159308579
  
    --- Diff: pxf/pxf-service/src/scripts/pxf-env.sh ---
    @@ -54,3 +54,5 @@ export HADOOP_DISTRO=${HADOOP_DISTRO}
     # Parent directory of Hadoop client installation (optional)
     # used in case of tarball-based installation when all clients are under a common parent directory
     export HADOOP_ROOT=${HADOOP_ROOT}
    +
    +export CATALINA_OPTS="-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005"
    --- End diff --
    
    this is for debugging, should not be committed !


---

[GitHub] incubator-hawq pull request #1326: HAWQ-1575. Implemented readable Parquet p...

Posted by sansanichfb <gi...@git.apache.org>.
Github user sansanichfb commented on a diff in the pull request:

    https://github.com/apache/incubator-hawq/pull/1326#discussion_r159110011
  
    --- Diff: pxf/pxf-service/src/main/resources/pxf-profiles-default.xml ---
    @@ -184,4 +184,13 @@ under the License.
                 <resolver>org.apache.hawq.pxf.plugins.jdbc.JdbcReadResolver</resolver>
             </plugins>
         </profile>
    +    <profile>
    +        <name>Parquet</name>
    --- End diff --
    
    The storage layer is determined in fragmenter and accessor, resolver should remain the same. I would leave it as Parquet and later reconsider storage vs format naming conventions for all profiles.


---

[GitHub] incubator-hawq pull request #1326: HAWQ-1575. Implemented readable Parquet p...

Posted by shivzone <gi...@git.apache.org>.
Github user shivzone commented on a diff in the pull request:

    https://github.com/apache/incubator-hawq/pull/1326#discussion_r159299485
  
    --- Diff: pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/ParquetFileAccessor.java ---
    @@ -0,0 +1,168 @@
    +package org.apache.hawq.pxf.plugins.hdfs;
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * 
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + * 
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.mapred.FileSplit;
    +import org.apache.hawq.pxf.api.OneRow;
    +import org.apache.hawq.pxf.api.ReadAccessor;
    +import org.apache.hawq.pxf.api.utilities.InputData;
    +import org.apache.hawq.pxf.api.utilities.Plugin;
    +import org.apache.hawq.pxf.plugins.hdfs.utilities.HdfsUtilities;
    +
    +import org.apache.parquet.column.page.PageReadStore;
    +import org.apache.parquet.example.data.Group;
    +import org.apache.parquet.example.data.simple.convert.GroupRecordConverter;
    +import org.apache.parquet.format.converter.ParquetMetadataConverter;
    +import org.apache.parquet.hadoop.ParquetFileReader;
    +import org.apache.parquet.io.ColumnIOFactory;
    +import org.apache.parquet.io.MessageColumnIO;
    +import org.apache.parquet.io.RecordReader;
    +import org.apache.parquet.schema.MessageType;
    +
    +import java.io.IOException;
    +import java.util.Iterator;
    +
    +/**
    + * Parquet file accessor.
    + */
    +public class ParquetFileAccessor extends Plugin implements ReadAccessor {
    +    private ParquetFileReader reader;
    +    private MessageColumnIO columnIO;
    +    private RecordIterator recordIterator;
    +    private MessageType schema;
    +
    +
    +    private class RecordIterator implements Iterator<OneRow> {
    +
    +        private final ParquetFileReader reader;
    +        private PageReadStore currentRowGroup;
    +        private RecordReader<Group> recordReader;
    +        private long rowsRemainedInRowGroup;
    +
    +        public RecordIterator(ParquetFileReader reader) {
    +            this.reader = reader;
    +            readNextRowGroup();
    +        }
    +
    +        @Override
    +        public boolean hasNext() {
    +            return rowsRemainedInRowGroup > 0;
    +        }
    +
    +        @Override
    +        public OneRow next() {
    +            return new OneRow(null, readNextGroup());
    +        }
    +
    +        @Override
    +        public void remove() {
    +            throw new UnsupportedOperationException();
    +        }
    +
    +        private void readNextRowGroup() {
    +            try {
    +                currentRowGroup = reader.readNextRowGroup();
    +            } catch (IOException e) {
    +                throw new RuntimeException("Error occurred during reading new row group", e);
    +            }
    +            if (currentRowGroup == null)
    +                return;
    +            rowsRemainedInRowGroup = currentRowGroup.getRowCount();
    +            recordReader = columnIO.getRecordReader(currentRowGroup, new GroupRecordConverter(schema));
    +        }
    +
    +        private Group readNextGroup() {
    +            Group g = null;
    +            if (rowsRemainedInRowGroup == 0) {
    +                readNextRowGroup();
    +                if (currentRowGroup != null) {
    +                    g = recordReader.read();
    +                }
    +            } else {
    +                g = recordReader.read();
    +                if (g == null) {
    --- End diff --
    
    Instead of this. Why don't we simply invoke recordReader.read() currentRowGroup.getRowCount() # of times instead of decrementing rowsRemainedInRowGroup which is making this code look complicated


---

[GitHub] incubator-hawq pull request #1326: HAWQ-1575. Implemented readable Parquet p...

Posted by sansanichfb <gi...@git.apache.org>.
Github user sansanichfb commented on a diff in the pull request:

    https://github.com/apache/incubator-hawq/pull/1326#discussion_r165782764
  
    --- Diff: pxf/pxf-service/src/scripts/pxf-env.sh ---
    @@ -54,3 +54,5 @@ export HADOOP_DISTRO=${HADOOP_DISTRO}
     # Parent directory of Hadoop client installation (optional)
     # used in case of tarball-based installation when all clients are under a common parent directory
     export HADOOP_ROOT=${HADOOP_ROOT}
    +
    +export CATALINA_OPTS="-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005"
    --- End diff --
    
    Sure, deleted.


---