You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@parquet.apache.org by Ryan Blue <rb...@netflix.com.INVALID> on 2019/02/15 19:59:59 UTC

Re: Table.newAppend or newReplacePartitions, local testing with multiple threads facing issue.

Manish,

Please send to the Parquet dev list. The google list is no longer used.

What table implementation are you using? Can you share the log output for
your test?

For your second issue, the problem is that you're using an old version of
the format that doesn't support manifest lists. Manifest lists were added
to speed up larger tables and to keep down the size of the root metadata
file. You need to read with a version that supports them.

On Thu, Feb 14, 2019 at 10:20 PM Manish Malhotra <
manish.malhotra.work@gmail.com> wrote:

> LOG.info("Failed to load committed snapshot, skipping manifest clean-up");
>
>
> I also this message being printed in the above scenario.
>
> When checked the code
>
> https://github.com/apache/incubator-iceberg/blob/master/core/src/main/java/com/netflix/iceberg/SnapshotUpdate.java#L192
>
>
> and looks like the committed one is not being loaded, and therefore manifests are not merged.
>
>
> Plus when I try to read this table from spark-shell, it throws exception, related to manifests files only:
>
>
> scala> val t = hd.load("/tmp/_iceberg/tables/test")
> java.lang.IllegalArgumentException: Cannot parse missing list manifests
>   at com.google.common.base.Preconditions.checkArgument(Preconditions.java:146)
>   at com.netflix.iceberg.util.JsonUtil.getStringList(JsonUtil.java:89)
>   at com.netflix.iceberg.SnapshotParser.fromJson(SnapshotParser.java:73)
>   at com.netflix.iceberg.TableMetadataParser.fromJson(TableMetadataParser.java:162)
>   at com.netflix.iceberg.TableMetadataParser.read(TableMetadataParser.java:133)
>   at com.netflix.iceberg.hadoop.HadoopTableOperations.refresh(HadoopTableOperations.java:90)
>   at com.netflix.iceberg.hadoop.HadoopTableOperations.current(HadoopTableOperations.java:61)
>   at com.netflix.iceberg.hadoop.HadoopTables.load(HadoopTables.java:59)
>   ... 56 elided
>
> scala> val result = spark.read.format("iceberg").load("/tmp/_iceberg/tables/test");
> java.lang.IllegalArgumentException: Cannot parse missing list manifests
>   at com.google.common.base.Preconditions.checkArgument(Preconditions.java:146)
>   at com.netflix.iceberg.util.JsonUtil.getStringList(JsonUtil.java:89)
>   at com.netflix.iceberg.SnapshotParser.fromJson(SnapshotParser.java:73)
>   at com.netflix.iceberg.TableMetadataParser.fromJson(TableMetadataParser.java:162)
>   at com.netflix.iceberg.TableMetadataParser.read(TableMetadataParser.java:133)
>   at com.netflix.iceberg.hadoop.HadoopTableOperations.refresh(HadoopTableOperations.java:90)
>   at com.netflix.iceberg.hadoop.HadoopTableOperations.current(HadoopTableOperations.java:61)
>   at com.netflix.iceberg.hadoop.HadoopTables.load(HadoopTables.java:59)
>   at com.netflix.iceberg.spark.source.IcebergSource.findTable(IcebergSource.java:99)
>   at com.netflix.iceberg.spark.source.IcebergSource.createReader(IcebergSource.java:56)
>   at org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation$SourceHelpers.createReader(DataSourceV2Relation.scala:155)
>   at org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation$.create(DataSourceV2Relation.scala:172)
>   at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:204)
>   at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178)
>   ... 56 elided
>
>
> thanks,
>
> - Manish
>
>
>
> On Thursday, February 14, 2019 at 5:47:17 PM UTC-8, Manish Malhotra wrote:
>>
>> Hello,
>>
>> Testing Iceberg with spark using table.newAppend or newReplacePartitions
>> API.
>> Flow of test is.
>> local file --> rdd --> parquet --> each_partition.files to iceberg.table
>> ( using newAppend or replacePartitions )
>>
>> Not sure, if Im missing something , but when testing with multiple
>> threads in local mode, its giving inconsistent results.
>> If I use 1 thread to create the session, then it works fine, as it
>> happens serially.
>> Below is the code snippet. ( which reads data from each day
>> partition/folder and add/append and commit to the iceberg table)
>>
>>
>> --> Test-Data:
>>
>> #id, value, date (epoch)
>>
>> 1,10,1549665714000
>> 2,20,1549665714000
>> 3,30,1549752114000
>> 4,40,1549752114000
>> 5,50,1549838514000
>> 6,60,1549838514000
>> 7,70,1549924914000
>> 8,80,1549924914000
>>
>>
>>
>> --> data on local disk ( in test case itself, created the parquet data
>> with partitions )
>>
>>
>> /tmp/_iceberg/tables/test/test-rdd-parquet/event_date=02082019/part-00000-ed6d68b0-5686-4ea2-9b6f-27f26a25397e.c000.snappy.parquet
>>
>> /tmp/_iceberg/tables/test/test-rdd-parquet/event_date=02092019/part-00000-ed6d68b0-5686-4ea2-9b6f-27f26a25397e.c000.snappy.parquet
>>
>> /tmp/_iceberg/tables/test/test-rdd-parquet/event_date=02102019/part-00000-ed6d68b0-5686-4ea2-9b6f-27f26a25397e.c000.snappy.parquet
>>
>> /tmp/_iceberg/tables/test/test-rdd-parquet/event_date=02112019/part-00000-ed6d68b0-5686-4ea2-9b6f-27f26a25397e.c000.snappy.parquet
>>
>>
>> --> Observation:
>>
>> With 2 threads, and 4 partitons.
>> foreach executes 4 times, and there will be 1 or more partitons will be
>> executed by each thread/task.
>> whichever partitions are being executed at the end of each thread, that
>> data is the only one which is committed.
>> So, end result is there is less data then present in the source parquet
>> files.
>>
>> As soon change to use single thread while creating spark session, it
>> works perfectly.
>> Not sure if Im missing something to understand, or in local mode. the
>> table.commit cannot
>> be used by concurrent threads/task.
>>
>> thanks for your time and help !
>>
>>
>> ## how created the spark session with 2 threads.
>> SparkSession sparkSessionTemp =
>> SparkSession.builder().master("local[2]").getOrCreate();
>>
>>
>> /**
>>      *
>>      * @param table
>>      * @param sourceParquetFilePath
>>      */
>>     public static void addDataToIcebergUsingTableAPIParallel(Table table,
>> String sourceParquetFilePath) {
>>
>>         File srcDir = new File(sourceParquetFilePath);
>>
>>         Collection<File> filesCollection  =
>> getSubDirectories(srcDir,"event_date");
>>
>>         List<PartitionDetail> partitionInfoVOS = new
>> ArrayList<>(filesCollection.size());
>>         PartitionDetail partitionInfoVO = null;
>>         Map<String, PartitionPathValue> partitionToDetail  = null;
>>         PartitionPathValue partitionPathValue = null;
>>         String value  = null;
>>
>>         for (File file : filesCollection) {
>>             value =
>> file.getPath().substring(file.getPath().indexOf("=")+1,
>> file.getPath().length());
>>             partitionPathValue = new
>> PartitionPathValue(file.getAbsolutePath(), value);
>>             partitionToDetail = new HashMap<>();
>>             partitionToDetail.put("event_date", partitionPathValue);
>>             partitionInfoVO = new PartitionDetail(partitionToDetail,
>> table.location());
>>             partitionInfoVOS.add(partitionInfoVO);
>>         }
>>
>>         JavaRDD<PartitionDetail> distData =
>> sc.parallelize(Lists.newArrayList(partitionInfoVOS),
>> partitionInfoVOS.size());
>>
>>         distData.foreach(
>>                 new VoidFunction<PartitionDetail>() {
>>                     @Override
>>                     public void call(PartitionDetail partitionDetail)
>> throws Exception {
>>
>>                         Map<String, PartitionPathValue> partitionToDetail
>> = partitionDetail.getPartitionToDetail();
>>                         Map<String, String> partionColumn = null;
>>                         HadoopTables tables = new
>> HadoopTables(sc.hadoopConfiguration());
>>                         final Table ibTable =
>> tables.load(partitionDetail.getTablePath());
>>                         AppendFiles replacePartitions =
>> ibTable.newAppend();
>>                         Iterator<Map.Entry<String, PartitionPathValue>>
>> iterator = partitionToDetail.entrySet().iterator();
>>                         Map.Entry<String, PartitionPathValue> entry =
>> iterator.next();
>>                         PartitionPathValue partitionPathValue =
>> entry.getValue();
>>                         partionColumn = new HashMap<>(1);
>>                         partionColumn.put(entry.getKey(),
>> partitionPathValue.getValue());
>>                         Seq<SparkTableUtil.SparkDataFile>
>> sparkDataFileSeq =
>> SparkTableUtil.listPartition(ScalaJavaConverters.toScalaImmutableMap(partionColumn),
>>                                 partitionPathValue.getPath(), "parquet");
>>
>>                         List<SparkTableUtil.SparkDataFile> sparkDataFiles
>> = ScalaJavaConverters.toJavaList(sparkDataFileSeq);
>>
>>                         for (SparkTableUtil.SparkDataFile sparkDataFile :
>> sparkDataFiles
>>                         ) {
>>                             try {
>>
>>
>> replacePartitions.appendFile(sparkDataFile.toDataFile(ibTable.spec())).apply();
>>                             } catch (Exception e) {
>>                                 logger.error(" Exception while
>> adding/committing files to iceberg : {} ",
>> ExceptionUtils.getFullStackTrace(e));
>>                             }
>>                         }
>>
>>                         replacePartitions.commit();
>>
>>                        // to print the files/snapshots.
>>                         final Table ibTable2 =
>> tables.load(partitionDetail.getTablePath());
>>
>>                         Iterable<Snapshot> snapshots =
>> ibTable2.snapshots();
>>
>>                         snapshots.forEach(snapshot ->  {
>>                             logger.info("Thread_id: {}, after committing
>> to table, " +
>>                                     "snapshot.addedFiles() : {} " ,
>> Thread.currentThread().getId(), snapshot.addedFiles());
>>
>>                             snapshot.addedFiles().forEach(dataFile -> {
>>                                         logger.info("Thread_id: {},
>> after committing to table, snapshot.dataFile() : {} " ,
>> Thread.currentThread().getId(), dataFile);
>>                                     }
>>                             );
>>                         });
>>                     }
>>                 });
>>
>>     }
>>
>> --
> You received this message because you are subscribed to the Google Groups
> "Iceberg Developers" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to iceberg-devel+unsubscribe@googlegroups.com.
> To post to this group, send email to iceberg-devel@googlegroups.com.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/iceberg-devel/d2b0e038-d62a-4022-9af4-21775441ac94%40googlegroups.com
> <https://groups.google.com/d/msgid/iceberg-devel/d2b0e038-d62a-4022-9af4-21775441ac94%40googlegroups.com?utm_medium=email&utm_source=footer>
> .
> For more options, visit https://groups.google.com/d/optout.
>


-- 
Ryan Blue
Software Engineer
Netflix