You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "DennisRutjes (via GitHub)" <gi...@apache.org> on 2023/05/30 11:06:50 UTC

[GitHub] [beam] DennisRutjes opened a new issue, #26935: FakeJobsService tries to base64 decode a plain QueryString resulting in an error.

DennisRutjes opened a new issue, #26935:
URL: https://github.com/apache/beam/issues/26935

   https://github.com/apache/beam/blob/fbeae980e93ea64fc8cc3ad074cbc8aebd157691/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeJobService.java#L488
   
   Not Sure if I am doing something wrong setting up the. FakeBigQueryServices
   
   I have a pipeline reading from BigQuery Table select * from project:dataset.table, the contents is the published to a topic on a queue . 
   The Pipeline is configured with the FakeBigQueryServices and PubsubTestClient to shield of the real interactions with GCP.
   
   After setting up the expected tables with te expected rows, the pipeline in BigquerIO wants to execute this part:
   
   ```
   private JobStatus runQueryJob(JobConfigurationQuery query)
         throws IOException, InterruptedException  {
       List<TableRow> rows = FakeBigQueryServices.rowsFromEncodedQuery(query.getQuery());
       datasetService.createTable(new Table().setTableReference(query.getDestinationTable()));
       datasetService.insertAll(query.getDestinationTable(), rows, null);
       return new JobStatus().setState("DONE");
     }
   ```
   
   The query from the job is in plain string:  select * from project:dataset.table, but somehow expects a base64 encoded version or encoded results?
   
   resulting in:
   ```
   com.google.common.io.BaseEncoding$DecodingException: Unrecognized character: 0x20"
   ```
   
   What am I missing?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [I] FakeJobsService tries to base64 decode a plain QueryString resulting in an error. [beam]

Posted by "hanalaydrus (via GitHub)" <gi...@apache.org>.
hanalaydrus commented on issue #26935:
URL: https://github.com/apache/beam/issues/26935#issuecomment-2060241169

   This happen to me too, have you found the solution?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] DennisRutjes commented on issue #26935: FakeJobsService tries to base64 decode a plain QueryString resulting in an error.

Posted by "DennisRutjes (via GitHub)" <gi...@apache.org>.
DennisRutjes commented on issue #26935:
URL: https://github.com/apache/beam/issues/26935#issuecomment-1569586169

   here is the code which illustrates the error:
   
   `package com.kramphub.davinci.pipelines.bigquery.job;
   
   import com.google.api.services.bigquery.model.*;
   import com.google.common.collect.ImmutableList;
   import org.apache.beam.sdk.Pipeline;
   import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers;
   import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
   import org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions;
   import org.apache.beam.sdk.io.gcp.bigquery.TestBigQueryOptions;
   import org.apache.beam.sdk.io.gcp.testing.FakeBigQueryServices;
   import org.apache.beam.sdk.io.gcp.testing.FakeDatasetService;
   import org.apache.beam.sdk.io.gcp.testing.FakeJobService;
   import org.apache.beam.sdk.options.PipelineOptions;
   import org.apache.beam.sdk.options.PipelineOptionsFactory;
   import org.junit.Rule;
   import org.junit.jupiter.api.Test;
   
   import org.junit.rules.TemporaryFolder;
   import org.junit.rules.Timeout;
   
   import java.io.IOException;
   import java.nio.file.Files;
   import java.util.List;
   
   public class TestPipeline {
   
   
       @Rule
       public transient Timeout globalTimeout = Timeout.seconds(60);
   
       private FakeDatasetService fakeDatasetService = new FakeDatasetService();
       private FakeJobService fakeJobService = new FakeJobService();
       private FakeBigQueryServices fakeBigQueryServices =
               new FakeBigQueryServices()
                       .withDatasetService(fakeDatasetService)
                       .withJobService(fakeJobService);
   
       @Test
       public void testFakeDataService() throws IOException, InterruptedException {
           String testFolder = Files.createTempDirectory("tmpDirPrefix").toFile().getAbsolutePath();
   
   
           FakeDatasetService.setUp();
   
           List<TableRow> data =
                   ImmutableList.of(
                           new TableRow().set("name", "a").set("number", 1L),
                           new TableRow().set("name", "b").set("number", 2L),
                           new TableRow().set("name", "c").set("number", 3L),
                           new TableRow().set("name", "d").set("number", 4L),
                           new TableRow().set("name", "e").set("number", 5L),
                           new TableRow().set("name", "f").set("number", 6L));
   
           String tableSpec = "project:data_set.table_name";
           TableReference table = BigQueryHelpers.parseTableSpec(tableSpec);
   
           fakeDatasetService.createDataset("project", "data_set", "europe-west1", "", null);
           fakeDatasetService.createTable(
                   new Table()
                           .setTableReference(table)
                           .setSchema(
                                   new TableSchema()
                                           .setFields(
                                                   ImmutableList.of(
                                                           new TableFieldSchema().setName("name").setType("STRING"),
                                                           new TableFieldSchema().setName("number").setType("INTEGER")))));
           fakeDatasetService.insertAll(table, data, null);
   
   
           PipelineOptions options = PipelineOptionsFactory.create();
           options.setTempLocation(testFolder);
           BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
           bqOptions.setProject("project");
   
           Pipeline pipeline = Pipeline.create(options);
           pipeline.apply(BigQueryIO.readTableRows()
                   .withTestServices(fakeBigQueryServices)
                   .fromQuery(String.format("SELECT * FROM `%s`", tableSpec.replace(":", ".")))
                   .withQueryLocation("europe-west1")
                   .usingStandardSql()
                   .withTemplateCompatibility()
                   .withoutValidation()
           );
   
   
           // RUN
           pipeline.run().waitUntilFinish();
   
       }
   }
   
   /*
   
   org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.io.IOException: Query job beam_bq_job_QUERY_testpipeline0531064239441eccb1_7835b3c830b4431dbfd01ebde163708b_22936c33de6f45798b1b43d18d6c2e1a failed, status: {
     "errorResult" : {
       "message" : "Job GenericData{classInfo=[copy, dryRun, extract, jobTimeoutMs, jobType, labels, load, query], {query=GenericData{classInfo=[allowLargeResults, clustering, connectionProperties, continuous, createDisposition, createSession, defaultDataset, destinationEncryptionConfiguration, destinationTable, flattenResults, maximumBillingTier, maximumBytesBilled, parameterMode, preserveNulls, priority, query, queryParameters, rangePartitioning, schemaUpdateOptions, tableDefinitions, timePartitioning, useLegacySql, useQueryCache, userDefinedFunctionResources, writeDisposition], {allowLargeResults=true, createDisposition=CREATE_IF_NEEDED, destinationTable=GenericData{classInfo=[datasetId, projectId, tableId], {datasetId=temp_dataset_beam_bq_job_QUERY_testpipelinedrutjes0531064239441eccb1_7835b3c830b4431dbfd01ebde163708b, projectId=project, tableId=temp_table_beam_bq_job_QUERY_testpipelinedrutjes0531064239441eccb1_7835b3c830b4431dbfd01ebde163708b}}, flattenResults=true, priority=BAT
 CH, query=SELECT * FROM `project.data_set.table_name`, useLegacySql=false, writeDisposition=WRITE_TRUNCATE}}}} failed: java.lang.IllegalArgumentException: com.google.common.io.BaseEncoding$DecodingException: Unrecognized character: 0x20"
     },
     "state" : "FAILED"
   }
   
    */`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org