You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@arrow.apache.org by "José F (Jira)" <ji...@apache.org> on 2022/01/20 21:20:00 UTC

[jira] [Updated] (ARROW-15397) Problem with Join in apache arrow in R

     [ https://issues.apache.org/jira/browse/ARROW-15397?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

José F updated ARROW-15397:
---------------------------
    Description: 
Hi dear arrow developers. I tested inner_join with arrow R package but R crashed, this is my example with toy dataset iris:

 

data(iris)
write.csv(iris, "iris.csv") # write csv file

# write parket file with write_chunk_data function (below)

walk("C:/Users/Stats/Desktop/ejemplo_join/iris.csv",
     write_chunk_data, "C:/Users/Stats/Desktop/ejemplo_join/parquet", chunk_size = 50)

 

iris_arrow <- open_dataset("parquet")

df1_arrow <- iris_arrow %>% select(`...1`, Sepal.Length, Sepal.Width, Petal.Length) 
df2_arrow <-   iris_arrow %>% select(`...1`, Petal.Width, Species,) d

df <- tabla1_arrow %>% inner_join(tabla2_arrow, by = "...1") %>%

group_by(Species) %>% summarise(prom = mean(Sepal.Length)) %>% collect()
print(df)

 

 

# Run this function to write parquet files in this example please

 write_chunk_data <- function(data_path, output_dir, chunk_size = 1000000) {
  #If the output_dir do not exist, it is created
  if (!fs::dir_exists(output_dir)) fs::dir_create(output_dir)
  #It gets the name of the file
  data_name <- fs::path_ext_remove(fs::path_file(data_path))
  #It sets the chunk_num to 0
  chunk_num <- 0
  #Read the file using vroom
  data_chunk <- vroom::vroom(data_path)
  #It gets the variable names
  data_names <- names(data_chunk)
  #It gets the number of rows
  rows<-nrow(data_chunk)
  
  #The following loop creates a parquet file for every [chunk_size] rows
  repeat{
    #It checks if we are over the max rows
    if(rows>(chunk_num+1)*chunk_size)

{       arrow::write_parquet(data_chunk[(chunk_num*chunk_size+1):((chunk_num+1)*chunk_size),],                             fs::path(output_dir, glue::glue("\\{data_name}

-\{chunk_num}.parquet")))
    }
    else

{       arrow::write_parquet(data_chunk[(chunk_num*chunk_size+1):rows,],                             fs::path(output_dir, glue::glue("\\{data_name}

-\{chunk_num}.parquet"))) 
      break
    }
    chunk_num <- chunk_num + 1
  }
   
  #This is to recover some memory and space in the disk
  rm(data_chunk)
  tmp_file <- tempdir()
  files <- list.files(tmp_file, full.names = T, pattern = "^vroom")
  file.remove(files)
}

 

  was:
Hi dear arrow developers. I tested inner_join with arrow R package but R crashed, this is my example with toy dataset iris:

 

data(iris)
write.csv(iris, "iris.csv") # write csv file

# write parket file with write_chunk_data (at the end for  interested users)

walk("C:/Users/Stats/Desktop/ejemplo_join/iris.csv",
     write_chunk_data, "C:/Users/Stats/Desktop/ejemplo_join/parquet", chunk_size = 50)

 

iris_arrow <- open_dataset("parquet")

df1_arrow <- iris_arrow %>% select(`...1`, Sepal.Length, Sepal.Width, Petal.Length) 
df2_arrow <-   iris_arrow %>% select(`...1`, Petal.Width, Species,) d

df <- tabla1_arrow %>% inner_join(tabla2_arrow, by = "...1") %>% 

group_by(Species) %>% summarise(prom = mean(Sepal.Length)) %>% collect()
print(df)

 

 

# If you want to check function to write parquet file from a csv:

 write_chunk_data <- function(data_path, output_dir, chunk_size = 1000000) {
  #If the output_dir do not exist, it is created
  if (!fs::dir_exists(output_dir)) fs::dir_create(output_dir)
  #It gets the name of the file
  data_name <- fs::path_ext_remove(fs::path_file(data_path))
  #It sets the chunk_num to 0
  chunk_num <- 0
  #Read the file using vroom
  data_chunk <- vroom::vroom(data_path)
  #It gets the variable names
  data_names <- names(data_chunk)
  #It gets the number of rows
  rows<-nrow(data_chunk)
  
  #The following loop creates a parquet file for every [chunk_size] rows
  repeat{
    #It checks if we are over the max rows
    if(rows>(chunk_num+1)*chunk_size){
      arrow::write_parquet(data_chunk[(chunk_num*chunk_size+1):((chunk_num+1)*chunk_size),], 
                           fs::path(output_dir, glue::glue("\{data_name}-\{chunk_num}.parquet")))
    }
    else{
      arrow::write_parquet(data_chunk[(chunk_num*chunk_size+1):rows,], 
                           fs::path(output_dir, glue::glue("\{data_name}-\{chunk_num}.parquet"))) 
      break
    }
    chunk_num <- chunk_num + 1
  }
   
  #This is to recover some memory and space in the disk
  rm(data_chunk)
  tmp_file <- tempdir()
  files <- list.files(tmp_file, full.names = T, pattern = "^vroom")
  file.remove(files)
}

 


> Problem with Join in apache arrow in R
> --------------------------------------
>
>                 Key: ARROW-15397
>                 URL: https://issues.apache.org/jira/browse/ARROW-15397
>             Project: Apache Arrow
>          Issue Type: Bug
>          Components: R
>    Affects Versions: 6.0.1
>            Reporter: José F
>            Priority: Major
>             Fix For: 6.0.2
>
>
> Hi dear arrow developers. I tested inner_join with arrow R package but R crashed, this is my example with toy dataset iris:
>  
> data(iris)
> write.csv(iris, "iris.csv") # write csv file
> # write parket file with write_chunk_data function (below)
> walk("C:/Users/Stats/Desktop/ejemplo_join/iris.csv",
>      write_chunk_data, "C:/Users/Stats/Desktop/ejemplo_join/parquet", chunk_size = 50)
>  
> iris_arrow <- open_dataset("parquet")
> df1_arrow <- iris_arrow %>% select(`...1`, Sepal.Length, Sepal.Width, Petal.Length) 
> df2_arrow <-   iris_arrow %>% select(`...1`, Petal.Width, Species,) d
> df <- tabla1_arrow %>% inner_join(tabla2_arrow, by = "...1") %>%
> group_by(Species) %>% summarise(prom = mean(Sepal.Length)) %>% collect()
> print(df)
>  
>  
> # Run this function to write parquet files in this example please
>  write_chunk_data <- function(data_path, output_dir, chunk_size = 1000000) {
>   #If the output_dir do not exist, it is created
>   if (!fs::dir_exists(output_dir)) fs::dir_create(output_dir)
>   #It gets the name of the file
>   data_name <- fs::path_ext_remove(fs::path_file(data_path))
>   #It sets the chunk_num to 0
>   chunk_num <- 0
>   #Read the file using vroom
>   data_chunk <- vroom::vroom(data_path)
>   #It gets the variable names
>   data_names <- names(data_chunk)
>   #It gets the number of rows
>   rows<-nrow(data_chunk)
>   
>   #The following loop creates a parquet file for every [chunk_size] rows
>   repeat{
>     #It checks if we are over the max rows
>     if(rows>(chunk_num+1)*chunk_size)
> {       arrow::write_parquet(data_chunk[(chunk_num*chunk_size+1):((chunk_num+1)*chunk_size),],                             fs::path(output_dir, glue::glue("\\{data_name}
> -\{chunk_num}.parquet")))
>     }
>     else
> {       arrow::write_parquet(data_chunk[(chunk_num*chunk_size+1):rows,],                             fs::path(output_dir, glue::glue("\\{data_name}
> -\{chunk_num}.parquet"))) 
>       break
>     }
>     chunk_num <- chunk_num + 1
>   }
>    
>   #This is to recover some memory and space in the disk
>   rm(data_chunk)
>   tmp_file <- tempdir()
>   files <- list.files(tmp_file, full.names = T, pattern = "^vroom")
>   file.remove(files)
> }
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)