You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@arrow.apache.org by "GANG LIAO (Jira)" <ji...@apache.org> on 2021/02/09 22:55:00 UTC

[jira] [Updated] (ARROW-11578) Why does DataFusion throw a Tokio 0.2 runtime isssue?

     [ https://issues.apache.org/jira/browse/ARROW-11578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

GANG LIAO updated ARROW-11578:
------------------------------
          Component/s: Rust - DataFusion
    Affects Version/s: 4.0.0
                       3.0.0
          Description: 
thread 'tests::simple_join' panicked at 'must be called from the context of a Tokio 0.2.x runtime configured with either `basic_scheduler` or `threaded_scheduler`'.

{code:rust}
    #[tokio::test]
    async fn simple_join() -> Result<()> {
        let schema1 = Arc::new(Schema::new(vec![
            Field::new("a", DataType::Utf8, false),
            Field::new("b", DataType::Int32, false),
        ]));
        let schema2 = Arc::new(Schema::new(vec![
            Field::new("c", DataType::Utf8, false),
            Field::new("d", DataType::Int32, false),
        ]));

        // define data.
        let batch1 = RecordBatch::try_new(
            schema1.clone(),
            vec![
                Arc::new(StringArray::from(vec!["a", "b", "c", "d"])),
                Arc::new(Int32Array::from(vec![1, 10, 10, 100])),
            ],
        )?;
        // define data.
        let batch2 = RecordBatch::try_new(
            schema2.clone(),
            vec![
                Arc::new(StringArray::from(vec!["a", "b", "c", "d"])),
                Arc::new(Int32Array::from(vec![1, 10, 10, 100])),
            ],
        )?;

        let mut ctx = ExecutionContext::new();

        let table1 = MemTable::try_new(schema1, vec![vec![batch1]])?;
        let table2 = MemTable::try_new(schema2, vec![vec![batch2]])?;

        ctx.register_table("t1", Box::new(table1));
        ctx.register_table("t2", Box::new(table2));

        let sql = concat!(
            "SELECT a, b, d ",
            "FROM t1 JOIN t2 ON a = c ",
            "ORDER BY b ASC ",
            "LIMIT 3"
        );

        let plan = ctx.create_logical_plan(&sql)?;
        let plan = ctx.optimize(&plan)?;
        let plan = ctx.create_physical_plan(&plan)?;

        let batches = collect(plan).await?;
        let formatted = arrow::util::pretty::pretty_format_batches(&batches).unwrap();
        let actual_lines: Vec<&str> = formatted.trim().lines().collect();

        let expected = vec![
            "+---+----+----+",
            "| a | b  | d  |",
            "+---+----+----+",
            "| a | 1  | 1  |",
            "| b | 10 | 10 |",
            "| c | 10 | 10 |",
            "+---+----+----+",
        ];

        assert_eq!(expected, actual_lines);

        Ok(())
    }
{code}


  was:

thread 'tests::simple_join' panicked at 'must be called from the context of a Tokio 0.2.x runtime configured with either `basic_scheduler` or `threaded_scheduler`'.

{code:rust}
    #[tokio::test]
    async fn simple_join() -> Result<()> {
        let schema1 = Arc::new(Schema::new(vec![
            Field::new("a", DataType::Utf8, false),
            Field::new("b", DataType::Int32, false),
        ]));
        let schema2 = Arc::new(Schema::new(vec![
            Field::new("c", DataType::Utf8, false),
            Field::new("d", DataType::Int32, false),
        ]));

        // define data.
        let batch1 = RecordBatch::try_new(
            schema1.clone(),
            vec![
                Arc::new(StringArray::from(vec!["a", "b", "c", "d"])),
                Arc::new(Int32Array::from(vec![1, 10, 10, 100])),
            ],
        )?;
        // define data.
        let batch2 = RecordBatch::try_new(
            schema2.clone(),
            vec![
                Arc::new(StringArray::from(vec!["a", "b", "c", "d"])),
                Arc::new(Int32Array::from(vec![1, 10, 10, 100])),
            ],
        )?;

        let mut ctx = ExecutionContext::new();

        let table1 = MemTable::try_new(schema1, vec![vec![batch1]])?;
        let table2 = MemTable::try_new(schema2, vec![vec![batch2]])?;

        ctx.register_table("t1", Box::new(table1));
        ctx.register_table("t2", Box::new(table2));

        let sql = concat!(
            "SELECT a, b, d ",
            "FROM t1 JOIN t2 ON a = c ",
            "ORDER BY b ASC ",
            "LIMIT 3"
        );

        let plan = ctx.create_logical_plan(&sql)?;
        let plan = ctx.optimize(&plan)?;
        let plan = ctx.create_physical_plan(&plan)?;

        let batches = collect(plan).await?;
        let formatted = arrow::util::pretty::pretty_format_batches(&batches).unwrap();
        let actual_lines: Vec<&str> = formatted.trim().lines().collect();

        let expected = vec![
            "+---+----+----+",
            "| a | b  | d  |",
            "+---+----+----+",
            "| a | 1  | 1  |",
            "| b | 10 | 10 |",
            "| c | 10 | 10 |",
            "+---+----+----+",
        ];

        assert_eq!(expected, actual_lines);

        Ok(())
    }
{code}



> Why does DataFusion throw a Tokio 0.2 runtime isssue?
> -----------------------------------------------------
>
>                 Key: ARROW-11578
>                 URL: https://issues.apache.org/jira/browse/ARROW-11578
>             Project: Apache Arrow
>          Issue Type: Bug
>          Components: Rust - DataFusion
>    Affects Versions: 3.0.0, 4.0.0
>            Reporter: GANG LIAO
>            Priority: Major
>
> thread 'tests::simple_join' panicked at 'must be called from the context of a Tokio 0.2.x runtime configured with either `basic_scheduler` or `threaded_scheduler`'.
> {code:rust}
>     #[tokio::test]
>     async fn simple_join() -> Result<()> {
>         let schema1 = Arc::new(Schema::new(vec![
>             Field::new("a", DataType::Utf8, false),
>             Field::new("b", DataType::Int32, false),
>         ]));
>         let schema2 = Arc::new(Schema::new(vec![
>             Field::new("c", DataType::Utf8, false),
>             Field::new("d", DataType::Int32, false),
>         ]));
>         // define data.
>         let batch1 = RecordBatch::try_new(
>             schema1.clone(),
>             vec![
>                 Arc::new(StringArray::from(vec!["a", "b", "c", "d"])),
>                 Arc::new(Int32Array::from(vec![1, 10, 10, 100])),
>             ],
>         )?;
>         // define data.
>         let batch2 = RecordBatch::try_new(
>             schema2.clone(),
>             vec![
>                 Arc::new(StringArray::from(vec!["a", "b", "c", "d"])),
>                 Arc::new(Int32Array::from(vec![1, 10, 10, 100])),
>             ],
>         )?;
>         let mut ctx = ExecutionContext::new();
>         let table1 = MemTable::try_new(schema1, vec![vec![batch1]])?;
>         let table2 = MemTable::try_new(schema2, vec![vec![batch2]])?;
>         ctx.register_table("t1", Box::new(table1));
>         ctx.register_table("t2", Box::new(table2));
>         let sql = concat!(
>             "SELECT a, b, d ",
>             "FROM t1 JOIN t2 ON a = c ",
>             "ORDER BY b ASC ",
>             "LIMIT 3"
>         );
>         let plan = ctx.create_logical_plan(&sql)?;
>         let plan = ctx.optimize(&plan)?;
>         let plan = ctx.create_physical_plan(&plan)?;
>         let batches = collect(plan).await?;
>         let formatted = arrow::util::pretty::pretty_format_batches(&batches).unwrap();
>         let actual_lines: Vec<&str> = formatted.trim().lines().collect();
>         let expected = vec![
>             "+---+----+----+",
>             "| a | b  | d  |",
>             "+---+----+----+",
>             "| a | 1  | 1  |",
>             "| b | 10 | 10 |",
>             "| c | 10 | 10 |",
>             "+---+----+----+",
>         ];
>         assert_eq!(expected, actual_lines);
>         Ok(())
>     }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)