You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@kudu.apache.org by "Grant Henke (JIRA)" <ji...@apache.org> on 2018/06/07 22:27:00 UTC
[jira] [Resolved] (KUDU-1820) Improve KuduContext with Range
Partitioning
[ https://issues.apache.org/jira/browse/KUDU-1820?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Grant Henke resolved KUDU-1820.
-------------------------------
Resolution: Duplicate
Fix Version/s: n/a
> Improve KuduContext with Range Partitioning
> -------------------------------------------
>
> Key: KUDU-1820
> URL: https://issues.apache.org/jira/browse/KUDU-1820
> Project: Kudu
> Issue Type: Improvement
> Components: spark
> Affects Versions: 1.0.1
> Reporter: Ryan Bosshart
> Assignee: Cam Quoc Mach
> Priority: Minor
> Fix For: n/a
>
>
> With hash partitions, I use the KuduContext like so:
> kuduContext.createTable(modifiedTable, predictions.schema, Seq("movieid","userid"),
> new CreateTableOptions().addHashPartitions(ImmutableList.of("movieid"), 3).setNumReplicas(1))
> There isn't a clean way to use KuduContext with range partitions however. I have it working below, but CreateTableOptions and KuduContext both take a schema, but of different formats (one a list of ColumnSchema, the other a StructType).
> val fixSchema: Schema = {
> val columns = ImmutableList.of(
> new ColumnSchemaBuilder("clordid", Type.STRING).key(true).build(),
> new ColumnSchemaBuilder("transacttime", Type.INT64).key(true).build(),
> ....
> new ColumnSchemaBuilder("lastupdated", Type.INT64).key(false).build())
> new Schema(columns)
> }
> val schema =
> StructType(
> StructField("clordid", StringType, false) ::
> StructField("transacttime", LongType, false) ::
> ....
> StructField("lastupdated", LongType, true) :: Nil)
> val kuduContext = new KuduContext(kuduMaster)
> val options = new CreateTableOptions()
> .setRangePartitionColumns(ImmutableList.of("transacttime"))
> .addHashPartitions(ImmutableList.of("clordid"), 3)
> .setNumReplicas(1)
> val today = new DateTime().withTimeAtStartOfDay()
> val dayInMillis = TimeUnit.MILLISECONDS.convert(1, TimeUnit.DAYS)
> for (i <- 0 until numberOfDays ){
> val lowerBound = fixSchema.newPartialRow()
> val lbMillis = today.plusDays(i).getMillis
> lowerBound.addLong("transacttime", lbMillis)
> val upperBound = fixSchema.newPartialRow()
> upperBound.addLong("transacttime", (lbMillis+dayInMillis-1))
> options.addRangePartition(lowerBound, upperBound)
> }
> kuduContext.createTable(tableName, schema, Seq("clordid","transacttime"),options)
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)