You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@hawq.apache.org by sansanichfb <gi...@git.apache.org> on 2017/12/29 02:39:20 UTC
[GitHub] incubator-hawq pull request #1326: HAWQ-1575. Implemented readable Parquet p...
GitHub user sansanichfb opened a pull request:
https://github.com/apache/incubator-hawq/pull/1326
HAWQ-1575. Implemented readable Parquet profile for PXF.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/sansanichfb/incubator-hawq HAWQ-1575
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/incubator-hawq/pull/1326.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #1326
----
commit f03bbb062b0e1d522dbae6d30d84cd0364f2af1c
Author: Alex Diachenko <od...@...>
Date: 2017-11-30T22:14:26Z
HAWQ-1575. Implemented readable Parquet profile for PXF.
----
---
[GitHub] incubator-hawq pull request #1326: HAWQ-1575. Implemented readable Parquet p...
Posted by sansanichfb <gi...@git.apache.org>.
Github user sansanichfb commented on a diff in the pull request:
https://github.com/apache/incubator-hawq/pull/1326#discussion_r165799526
--- Diff: pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/ParquetDataFragmenter.java ---
@@ -0,0 +1,103 @@
+package org.apache.hawq.pxf.plugins.hdfs;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hawq.pxf.api.Fragment;
+import org.apache.hawq.pxf.api.Fragmenter;
+import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.hawq.pxf.plugins.hdfs.utilities.HdfsUtilities;
+import org.apache.parquet.format.converter.ParquetMetadataConverter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.ParquetInputFormat;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.schema.MessageType;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class ParquetDataFragmenter extends Fragmenter {
+ private Job job;
+
+ public ParquetDataFragmenter(InputData md) {
+ super(md);
+ JobConf jobConf = new JobConf(new Configuration(), ParquetDataFragmenter.class);
+ try {
+ job = Job.getInstance(jobConf);
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to instantiate a job for reading fragments", e);
+ }
+ }
+
+
+ @Override
+ public List<Fragment> getFragments() throws Exception {
+ String absoluteDataPath = HdfsUtilities.absoluteDataPath(inputData.getDataSource());
+ ArrayList<InputSplit> splits = getSplits(new Path(absoluteDataPath));
+
+ for (InputSplit split : splits) {
+ FileSplit fsp = (FileSplit) split;
+
+ String filepath = fsp.getPath().toUri().getPath();
+ String[] hosts = fsp.getLocations();
+
+ Path file = new Path(filepath);
+
+ ParquetMetadata metadata = ParquetFileReader.readFooter(
+ job.getConfiguration(), file, ParquetMetadataConverter.NO_FILTER);
+ MessageType schema = metadata.getFileMetaData().getSchema();
+
+ byte[] fragmentMetadata = HdfsUtilities.prepareFragmentMetadata(fsp.getStart(), fsp.getLength(), fsp.getLocations());
--- End diff --
This method is needed to support `org.apache.hadoop.mapreduce.lib.input.FileSplit` type.
---
[GitHub] incubator-hawq pull request #1326: HAWQ-1575. Implemented readable Parquet p...
Posted by sansanichfb <gi...@git.apache.org>.
Github user sansanichfb commented on a diff in the pull request:
https://github.com/apache/incubator-hawq/pull/1326#discussion_r166175619
--- Diff: pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/ParquetFileAccessor.java ---
@@ -0,0 +1,168 @@
+package org.apache.hawq.pxf.plugins.hdfs;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hawq.pxf.api.OneRow;
+import org.apache.hawq.pxf.api.ReadAccessor;
+import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.hawq.pxf.api.utilities.Plugin;
+import org.apache.hawq.pxf.plugins.hdfs.utilities.HdfsUtilities;
+
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.convert.GroupRecordConverter;
+import org.apache.parquet.format.converter.ParquetMetadataConverter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.io.ColumnIOFactory;
+import org.apache.parquet.io.MessageColumnIO;
+import org.apache.parquet.io.RecordReader;
+import org.apache.parquet.schema.MessageType;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * Parquet file accessor.
--- End diff --
Added
---
[GitHub] incubator-hawq pull request #1326: HAWQ-1575. Implemented readable Parquet p...
Posted by shivzone <gi...@git.apache.org>.
Github user shivzone commented on a diff in the pull request:
https://github.com/apache/incubator-hawq/pull/1326#discussion_r159297466
--- Diff: pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/ParquetDataFragmenter.java ---
@@ -0,0 +1,103 @@
+package org.apache.hawq.pxf.plugins.hdfs;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hawq.pxf.api.Fragment;
+import org.apache.hawq.pxf.api.Fragmenter;
+import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.hawq.pxf.plugins.hdfs.utilities.HdfsUtilities;
+import org.apache.parquet.format.converter.ParquetMetadataConverter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.ParquetInputFormat;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.schema.MessageType;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class ParquetDataFragmenter extends Fragmenter {
+ private Job job;
+
+ public ParquetDataFragmenter(InputData md) {
+ super(md);
+ JobConf jobConf = new JobConf(new Configuration(), ParquetDataFragmenter.class);
+ try {
+ job = Job.getInstance(jobConf);
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to instantiate a job for reading fragments", e);
+ }
+ }
+
+
+ @Override
+ public List<Fragment> getFragments() throws Exception {
+ String absoluteDataPath = HdfsUtilities.absoluteDataPath(inputData.getDataSource());
+ ArrayList<InputSplit> splits = getSplits(new Path(absoluteDataPath));
+
+ for (InputSplit split : splits) {
+ FileSplit fsp = (FileSplit) split;
+
+ String filepath = fsp.getPath().toUri().getPath();
+ String[] hosts = fsp.getLocations();
+
+ Path file = new Path(filepath);
+
+ ParquetMetadata metadata = ParquetFileReader.readFooter(
+ job.getConfiguration(), file, ParquetMetadataConverter.NO_FILTER);
+ MessageType schema = metadata.getFileMetaData().getSchema();
+
+ byte[] fragmentMetadata = HdfsUtilities.prepareFragmentMetadata(fsp.getStart(), fsp.getLength(), fsp.getLocations());
--- End diff --
Can we simply use prepareFragmentMetadata(fsp)
---
[GitHub] incubator-hawq pull request #1326: HAWQ-1575. Implemented readable Parquet p...
Posted by shivzone <gi...@git.apache.org>.
Github user shivzone commented on a diff in the pull request:
https://github.com/apache/incubator-hawq/pull/1326#discussion_r159298493
--- Diff: pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/ParquetFileAccessor.java ---
@@ -0,0 +1,168 @@
+package org.apache.hawq.pxf.plugins.hdfs;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hawq.pxf.api.OneRow;
+import org.apache.hawq.pxf.api.ReadAccessor;
+import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.hawq.pxf.api.utilities.Plugin;
+import org.apache.hawq.pxf.plugins.hdfs.utilities.HdfsUtilities;
+
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.convert.GroupRecordConverter;
+import org.apache.parquet.format.converter.ParquetMetadataConverter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.io.ColumnIOFactory;
+import org.apache.parquet.io.MessageColumnIO;
+import org.apache.parquet.io.RecordReader;
+import org.apache.parquet.schema.MessageType;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * Parquet file accessor.
--- End diff --
Also mention what exactly the accessor call returns (record or chunk etc)
---
[GitHub] incubator-hawq pull request #1326: HAWQ-1575. Implemented readable Parquet p...
Posted by denalex <gi...@git.apache.org>.
Github user denalex commented on a diff in the pull request:
https://github.com/apache/incubator-hawq/pull/1326#discussion_r159306278
--- Diff: pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/ParquetDataFragmenter.java ---
@@ -0,0 +1,103 @@
+package org.apache.hawq.pxf.plugins.hdfs;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hawq.pxf.api.Fragment;
+import org.apache.hawq.pxf.api.Fragmenter;
+import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.hawq.pxf.plugins.hdfs.utilities.HdfsUtilities;
+import org.apache.parquet.format.converter.ParquetMetadataConverter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.ParquetInputFormat;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.schema.MessageType;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class ParquetDataFragmenter extends Fragmenter {
+ private Job job;
+
+ public ParquetDataFragmenter(InputData md) {
+ super(md);
+ JobConf jobConf = new JobConf(new Configuration(), ParquetDataFragmenter.class);
+ try {
+ job = Job.getInstance(jobConf);
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to instantiate a job for reading fragments", e);
+ }
+ }
+
+
+ @Override
+ public List<Fragment> getFragments() throws Exception {
+ String absoluteDataPath = HdfsUtilities.absoluteDataPath(inputData.getDataSource());
+ ArrayList<InputSplit> splits = getSplits(new Path(absoluteDataPath));
--- End diff --
usually best to declare type as List, especially since there is no direct access calls that would justify narrowing this to ArrayList
---
[GitHub] incubator-hawq pull request #1326: HAWQ-1575. Implemented readable Parquet p...
Posted by lisakowen <gi...@git.apache.org>.
Github user lisakowen commented on a diff in the pull request:
https://github.com/apache/incubator-hawq/pull/1326#discussion_r159073951
--- Diff: pxf/pxf-service/src/main/resources/pxf-profiles-default.xml ---
@@ -184,4 +184,13 @@ under the License.
<resolver>org.apache.hawq.pxf.plugins.jdbc.JdbcReadResolver</resolver>
</plugins>
</profile>
+ <profile>
+ <name>Parquet</name>
--- End diff --
if someone could create another connector/profile that would access parquet files from a different external data store, we should consider qualifying the profile name, i.e. HdfsParquet. (along the same lines, perhaps we should qualify the profiles named Avro and Json.)
---
[GitHub] incubator-hawq pull request #1326: HAWQ-1575. Implemented readable Parquet p...
Posted by sansanichfb <gi...@git.apache.org>.
Github user sansanichfb commented on a diff in the pull request:
https://github.com/apache/incubator-hawq/pull/1326#discussion_r165782793
--- Diff: pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/ParquetDataFragmenter.java ---
@@ -0,0 +1,103 @@
+package org.apache.hawq.pxf.plugins.hdfs;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hawq.pxf.api.Fragment;
+import org.apache.hawq.pxf.api.Fragmenter;
+import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.hawq.pxf.plugins.hdfs.utilities.HdfsUtilities;
+import org.apache.parquet.format.converter.ParquetMetadataConverter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.ParquetInputFormat;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.schema.MessageType;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class ParquetDataFragmenter extends Fragmenter {
+ private Job job;
+
+ public ParquetDataFragmenter(InputData md) {
+ super(md);
+ JobConf jobConf = new JobConf(new Configuration(), ParquetDataFragmenter.class);
+ try {
+ job = Job.getInstance(jobConf);
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to instantiate a job for reading fragments", e);
+ }
+ }
+
+
+ @Override
+ public List<Fragment> getFragments() throws Exception {
+ String absoluteDataPath = HdfsUtilities.absoluteDataPath(inputData.getDataSource());
+ ArrayList<InputSplit> splits = getSplits(new Path(absoluteDataPath));
--- End diff --
Thanks, updated
---
[GitHub] incubator-hawq pull request #1326: HAWQ-1575. Implemented readable Parquet p...
Posted by sansanichfb <gi...@git.apache.org>.
Github user sansanichfb commented on a diff in the pull request:
https://github.com/apache/incubator-hawq/pull/1326#discussion_r165783402
--- Diff: pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/utilities/HdfsUtilities.java ---
@@ -151,18 +153,42 @@ public static boolean isThreadSafe(String dataDir, String compCodec) {
* @param fsp file split to be serialized
* @return byte serialization of fsp
* @throws IOException if I/O errors occur while writing to the underlying
- * stream
+ * stream
*/
public static byte[] prepareFragmentMetadata(FileSplit fsp)
throws IOException {
- ByteArrayOutputStream byteArrayStream = new ByteArrayOutputStream();
- ObjectOutputStream objectStream = new ObjectOutputStream(
- byteArrayStream);
- objectStream.writeLong(fsp.getStart());
- objectStream.writeLong(fsp.getLength());
- objectStream.writeObject(fsp.getLocations());
+
+ return prepareFragmentMetadata(fsp.getStart(), fsp.getLength(), fsp.getLocations());
+
+ }
+
+ public static byte[] prepareFragmentMetadata(long start, long length, String[] locations)
+ throws IOException {
+
+ ByteArrayOutputStream byteArrayStream = writeBaseFragmentInfo(start, length, locations);
return byteArrayStream.toByteArray();
+
+ }
+
+ private static ByteArrayOutputStream writeBaseFragmentInfo(long start, long length, String[] locations) throws IOException {
+ ByteArrayOutputStream byteArrayStream = new ByteArrayOutputStream();
+ ObjectOutputStream objectStream = new ObjectOutputStream(byteArrayStream);
+ objectStream.writeLong(start);
+ objectStream.writeLong(length);
+ objectStream.writeObject(locations);
+ return byteArrayStream;
+ }
+
+ public static byte[] prepareFragmentMetadata(long start,
--- End diff --
Thanks, deleted as unnecessary
---
[GitHub] incubator-hawq pull request #1326: HAWQ-1575. Implemented readable Parquet p...
Posted by sansanichfb <gi...@git.apache.org>.
Github user sansanichfb closed the pull request at:
https://github.com/apache/incubator-hawq/pull/1326
---
[GitHub] incubator-hawq pull request #1326: HAWQ-1575. Implemented readable Parquet p...
Posted by shivzone <gi...@git.apache.org>.
Github user shivzone commented on a diff in the pull request:
https://github.com/apache/incubator-hawq/pull/1326#discussion_r159298191
--- Diff: pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/ParquetDataFragmenter.java ---
@@ -0,0 +1,103 @@
+package org.apache.hawq.pxf.plugins.hdfs;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hawq.pxf.api.Fragment;
+import org.apache.hawq.pxf.api.Fragmenter;
+import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.hawq.pxf.plugins.hdfs.utilities.HdfsUtilities;
+import org.apache.parquet.format.converter.ParquetMetadataConverter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.ParquetInputFormat;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.schema.MessageType;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class ParquetDataFragmenter extends Fragmenter {
+ private Job job;
+
+ public ParquetDataFragmenter(InputData md) {
+ super(md);
+ JobConf jobConf = new JobConf(new Configuration(), ParquetDataFragmenter.class);
+ try {
+ job = Job.getInstance(jobConf);
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to instantiate a job for reading fragments", e);
+ }
+ }
+
+
+ @Override
--- End diff --
Comments would be useful describing the components of the Fragment data here
---
[GitHub] incubator-hawq pull request #1326: HAWQ-1575. Implemented readable Parquet p...
Posted by shivzone <gi...@git.apache.org>.
Github user shivzone commented on a diff in the pull request:
https://github.com/apache/incubator-hawq/pull/1326#discussion_r159297714
--- Diff: pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/utilities/HdfsUtilities.java ---
@@ -151,18 +153,42 @@ public static boolean isThreadSafe(String dataDir, String compCodec) {
* @param fsp file split to be serialized
* @return byte serialization of fsp
* @throws IOException if I/O errors occur while writing to the underlying
- * stream
+ * stream
*/
public static byte[] prepareFragmentMetadata(FileSplit fsp)
throws IOException {
- ByteArrayOutputStream byteArrayStream = new ByteArrayOutputStream();
- ObjectOutputStream objectStream = new ObjectOutputStream(
- byteArrayStream);
- objectStream.writeLong(fsp.getStart());
- objectStream.writeLong(fsp.getLength());
- objectStream.writeObject(fsp.getLocations());
+
+ return prepareFragmentMetadata(fsp.getStart(), fsp.getLength(), fsp.getLocations());
+
+ }
+
+ public static byte[] prepareFragmentMetadata(long start, long length, String[] locations)
--- End diff --
This can be made private
---
[GitHub] incubator-hawq pull request #1326: HAWQ-1575. Implemented readable Parquet p...
Posted by denalex <gi...@git.apache.org>.
Github user denalex commented on a diff in the pull request:
https://github.com/apache/incubator-hawq/pull/1326#discussion_r159308210
--- Diff: pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/utilities/HdfsUtilities.java ---
@@ -151,18 +153,42 @@ public static boolean isThreadSafe(String dataDir, String compCodec) {
* @param fsp file split to be serialized
* @return byte serialization of fsp
* @throws IOException if I/O errors occur while writing to the underlying
- * stream
+ * stream
*/
public static byte[] prepareFragmentMetadata(FileSplit fsp)
throws IOException {
- ByteArrayOutputStream byteArrayStream = new ByteArrayOutputStream();
- ObjectOutputStream objectStream = new ObjectOutputStream(
- byteArrayStream);
- objectStream.writeLong(fsp.getStart());
- objectStream.writeLong(fsp.getLength());
- objectStream.writeObject(fsp.getLocations());
+
+ return prepareFragmentMetadata(fsp.getStart(), fsp.getLength(), fsp.getLocations());
+
+ }
+
+ public static byte[] prepareFragmentMetadata(long start, long length, String[] locations)
--- End diff --
or better to incorporate 2 lines from this function into the parent function, if it only is used once.
---
[GitHub] incubator-hawq pull request #1326: HAWQ-1575. Implemented readable Parquet p...
Posted by shivzone <gi...@git.apache.org>.
Github user shivzone commented on a diff in the pull request:
https://github.com/apache/incubator-hawq/pull/1326#discussion_r159297386
--- Diff: pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/utilities/HdfsUtilities.java ---
@@ -151,18 +153,42 @@ public static boolean isThreadSafe(String dataDir, String compCodec) {
* @param fsp file split to be serialized
* @return byte serialization of fsp
* @throws IOException if I/O errors occur while writing to the underlying
- * stream
+ * stream
*/
public static byte[] prepareFragmentMetadata(FileSplit fsp)
throws IOException {
- ByteArrayOutputStream byteArrayStream = new ByteArrayOutputStream();
- ObjectOutputStream objectStream = new ObjectOutputStream(
- byteArrayStream);
- objectStream.writeLong(fsp.getStart());
- objectStream.writeLong(fsp.getLength());
- objectStream.writeObject(fsp.getLocations());
+
+ return prepareFragmentMetadata(fsp.getStart(), fsp.getLength(), fsp.getLocations());
+
+ }
+
+ public static byte[] prepareFragmentMetadata(long start, long length, String[] locations)
+ throws IOException {
+
+ ByteArrayOutputStream byteArrayStream = writeBaseFragmentInfo(start, length, locations);
return byteArrayStream.toByteArray();
+
+ }
+
+ private static ByteArrayOutputStream writeBaseFragmentInfo(long start, long length, String[] locations) throws IOException {
+ ByteArrayOutputStream byteArrayStream = new ByteArrayOutputStream();
+ ObjectOutputStream objectStream = new ObjectOutputStream(byteArrayStream);
+ objectStream.writeLong(start);
+ objectStream.writeLong(length);
+ objectStream.writeObject(locations);
+ return byteArrayStream;
+ }
+
+ public static byte[] prepareFragmentMetadata(long start,
--- End diff --
I do'nt see this function used anywhere ?
---
[GitHub] incubator-hawq pull request #1326: HAWQ-1575. Implemented readable Parquet p...
Posted by sansanichfb <gi...@git.apache.org>.
Github user sansanichfb commented on a diff in the pull request:
https://github.com/apache/incubator-hawq/pull/1326#discussion_r166173757
--- Diff: pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/ParquetFileAccessor.java ---
@@ -0,0 +1,168 @@
+package org.apache.hawq.pxf.plugins.hdfs;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hawq.pxf.api.OneRow;
+import org.apache.hawq.pxf.api.ReadAccessor;
+import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.hawq.pxf.api.utilities.Plugin;
+import org.apache.hawq.pxf.plugins.hdfs.utilities.HdfsUtilities;
+
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.convert.GroupRecordConverter;
+import org.apache.parquet.format.converter.ParquetMetadataConverter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.io.ColumnIOFactory;
+import org.apache.parquet.io.MessageColumnIO;
+import org.apache.parquet.io.RecordReader;
+import org.apache.parquet.schema.MessageType;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * Parquet file accessor.
+ */
+public class ParquetFileAccessor extends Plugin implements ReadAccessor {
+ private ParquetFileReader reader;
+ private MessageColumnIO columnIO;
+ private RecordIterator recordIterator;
+ private MessageType schema;
+
+
+ private class RecordIterator implements Iterator<OneRow> {
+
+ private final ParquetFileReader reader;
+ private PageReadStore currentRowGroup;
+ private RecordReader<Group> recordReader;
+ private long rowsRemainedInRowGroup;
+
+ public RecordIterator(ParquetFileReader reader) {
+ this.reader = reader;
+ readNextRowGroup();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return rowsRemainedInRowGroup > 0;
+ }
+
+ @Override
+ public OneRow next() {
+ return new OneRow(null, readNextGroup());
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ private void readNextRowGroup() {
+ try {
+ currentRowGroup = reader.readNextRowGroup();
+ } catch (IOException e) {
+ throw new RuntimeException("Error occurred during reading new row group", e);
+ }
+ if (currentRowGroup == null)
+ return;
+ rowsRemainedInRowGroup = currentRowGroup.getRowCount();
+ recordReader = columnIO.getRecordReader(currentRowGroup, new GroupRecordConverter(schema));
+ }
+
+ private Group readNextGroup() {
+ Group g = null;
+ if (rowsRemainedInRowGroup == 0) {
+ readNextRowGroup();
+ if (currentRowGroup != null) {
+ g = recordReader.read();
+ }
+ } else {
+ g = recordReader.read();
+ if (g == null) {
--- End diff --
Even though could looks slightly more complex - we are saving on invoking that method for every single record.
---
[GitHub] incubator-hawq pull request #1326: HAWQ-1575. Implemented readable Parquet p...
Posted by sansanichfb <gi...@git.apache.org>.
Github user sansanichfb commented on a diff in the pull request:
https://github.com/apache/incubator-hawq/pull/1326#discussion_r166173537
--- Diff: pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/utilities/HdfsUtilities.java ---
@@ -151,18 +153,42 @@ public static boolean isThreadSafe(String dataDir, String compCodec) {
* @param fsp file split to be serialized
* @return byte serialization of fsp
* @throws IOException if I/O errors occur while writing to the underlying
- * stream
+ * stream
*/
public static byte[] prepareFragmentMetadata(FileSplit fsp)
throws IOException {
- ByteArrayOutputStream byteArrayStream = new ByteArrayOutputStream();
- ObjectOutputStream objectStream = new ObjectOutputStream(
- byteArrayStream);
- objectStream.writeLong(fsp.getStart());
- objectStream.writeLong(fsp.getLength());
- objectStream.writeObject(fsp.getLocations());
+
+ return prepareFragmentMetadata(fsp.getStart(), fsp.getLength(), fsp.getLocations());
+
+ }
+
+ public static byte[] prepareFragmentMetadata(long start, long length, String[] locations)
--- End diff --
Both functions are used so I would rather keep them both for sake of compatibility.
---
[GitHub] incubator-hawq pull request #1326: HAWQ-1575. Implemented readable Parquet p...
Posted by denalex <gi...@git.apache.org>.
Github user denalex commented on a diff in the pull request:
https://github.com/apache/incubator-hawq/pull/1326#discussion_r159308579
--- Diff: pxf/pxf-service/src/scripts/pxf-env.sh ---
@@ -54,3 +54,5 @@ export HADOOP_DISTRO=${HADOOP_DISTRO}
# Parent directory of Hadoop client installation (optional)
# used in case of tarball-based installation when all clients are under a common parent directory
export HADOOP_ROOT=${HADOOP_ROOT}
+
+export CATALINA_OPTS="-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005"
--- End diff --
this is for debugging, should not be committed !
---
[GitHub] incubator-hawq pull request #1326: HAWQ-1575. Implemented readable Parquet p...
Posted by sansanichfb <gi...@git.apache.org>.
Github user sansanichfb commented on a diff in the pull request:
https://github.com/apache/incubator-hawq/pull/1326#discussion_r159110011
--- Diff: pxf/pxf-service/src/main/resources/pxf-profiles-default.xml ---
@@ -184,4 +184,13 @@ under the License.
<resolver>org.apache.hawq.pxf.plugins.jdbc.JdbcReadResolver</resolver>
</plugins>
</profile>
+ <profile>
+ <name>Parquet</name>
--- End diff --
The storage layer is determined in fragmenter and accessor, resolver should remain the same. I would leave it as Parquet and later reconsider storage vs format naming conventions for all profiles.
---
[GitHub] incubator-hawq pull request #1326: HAWQ-1575. Implemented readable Parquet p...
Posted by shivzone <gi...@git.apache.org>.
Github user shivzone commented on a diff in the pull request:
https://github.com/apache/incubator-hawq/pull/1326#discussion_r159299485
--- Diff: pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/ParquetFileAccessor.java ---
@@ -0,0 +1,168 @@
+package org.apache.hawq.pxf.plugins.hdfs;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hawq.pxf.api.OneRow;
+import org.apache.hawq.pxf.api.ReadAccessor;
+import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.hawq.pxf.api.utilities.Plugin;
+import org.apache.hawq.pxf.plugins.hdfs.utilities.HdfsUtilities;
+
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.convert.GroupRecordConverter;
+import org.apache.parquet.format.converter.ParquetMetadataConverter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.io.ColumnIOFactory;
+import org.apache.parquet.io.MessageColumnIO;
+import org.apache.parquet.io.RecordReader;
+import org.apache.parquet.schema.MessageType;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * Parquet file accessor.
+ */
+public class ParquetFileAccessor extends Plugin implements ReadAccessor {
+ private ParquetFileReader reader;
+ private MessageColumnIO columnIO;
+ private RecordIterator recordIterator;
+ private MessageType schema;
+
+
+ private class RecordIterator implements Iterator<OneRow> {
+
+ private final ParquetFileReader reader;
+ private PageReadStore currentRowGroup;
+ private RecordReader<Group> recordReader;
+ private long rowsRemainedInRowGroup;
+
+ public RecordIterator(ParquetFileReader reader) {
+ this.reader = reader;
+ readNextRowGroup();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return rowsRemainedInRowGroup > 0;
+ }
+
+ @Override
+ public OneRow next() {
+ return new OneRow(null, readNextGroup());
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ private void readNextRowGroup() {
+ try {
+ currentRowGroup = reader.readNextRowGroup();
+ } catch (IOException e) {
+ throw new RuntimeException("Error occurred during reading new row group", e);
+ }
+ if (currentRowGroup == null)
+ return;
+ rowsRemainedInRowGroup = currentRowGroup.getRowCount();
+ recordReader = columnIO.getRecordReader(currentRowGroup, new GroupRecordConverter(schema));
+ }
+
+ private Group readNextGroup() {
+ Group g = null;
+ if (rowsRemainedInRowGroup == 0) {
+ readNextRowGroup();
+ if (currentRowGroup != null) {
+ g = recordReader.read();
+ }
+ } else {
+ g = recordReader.read();
+ if (g == null) {
--- End diff --
Instead of this. Why don't we simply invoke recordReader.read() currentRowGroup.getRowCount() # of times instead of decrementing rowsRemainedInRowGroup which is making this code look complicated
---
[GitHub] incubator-hawq pull request #1326: HAWQ-1575. Implemented readable Parquet p...
Posted by sansanichfb <gi...@git.apache.org>.
Github user sansanichfb commented on a diff in the pull request:
https://github.com/apache/incubator-hawq/pull/1326#discussion_r165782764
--- Diff: pxf/pxf-service/src/scripts/pxf-env.sh ---
@@ -54,3 +54,5 @@ export HADOOP_DISTRO=${HADOOP_DISTRO}
# Parent directory of Hadoop client installation (optional)
# used in case of tarball-based installation when all clients are under a common parent directory
export HADOOP_ROOT=${HADOOP_ROOT}
+
+export CATALINA_OPTS="-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005"
--- End diff --
Sure, deleted.
---