You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by zy...@apache.org on 2023/06/08 17:41:45 UTC
[doris] branch master updated: [fix](dbt) dbt incremental append (#20513)
This is an automated email from the ASF dual-hosted git repository.
zykkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new e1184bf4dc [fix](dbt) dbt incremental append (#20513)
e1184bf4dc is described below
commit e1184bf4dc6f513e23fcbc9605d897c101e7dff6
Author: catpineapple <42...@users.noreply.github.com>
AuthorDate: Fri Jun 9 01:41:33 2023 +0800
[fix](dbt) dbt incremental append (#20513)
---
.../dbt-doris/dbt/adapters/doris/__version__.py | 2 +-
.../dbt-doris/dbt/include/doris/dbt_project.yml | 2 +-
.../dbt/include/doris/macros/adapters/relation.sql | 2 +-
.../macros/materializations/incremental/help.sql | 7 +-
.../materializations/incremental/incremental.sql | 101 +++++++++++++--------
extension/dbt-doris/setup.py | 5 +-
6 files changed, 76 insertions(+), 43 deletions(-)
diff --git a/extension/dbt-doris/dbt/adapters/doris/__version__.py b/extension/dbt-doris/dbt/adapters/doris/__version__.py
index e7da78ed97..201fc2407f 100644
--- a/extension/dbt-doris/dbt/adapters/doris/__version__.py
+++ b/extension/dbt-doris/dbt/adapters/doris/__version__.py
@@ -22,4 +22,4 @@
# this 'version' must be set !!!
# otherwise the adapters will not be found after the 'dbt init xxx' command
-version = "1.3.0"
\ No newline at end of file
+version = "0.2.1"
diff --git a/extension/dbt-doris/dbt/include/doris/dbt_project.yml b/extension/dbt-doris/dbt/include/doris/dbt_project.yml
index a0518da8df..1cd7e916a8 100644
--- a/extension/dbt-doris/dbt/include/doris/dbt_project.yml
+++ b/extension/dbt-doris/dbt/include/doris/dbt_project.yml
@@ -19,7 +19,7 @@
# under the License.
name: dbt_doris
-version: 1.3.0
+version: 0.2.1
config-version: 2
macro-paths: ["macros"]
diff --git a/extension/dbt-doris/dbt/include/doris/macros/adapters/relation.sql b/extension/dbt-doris/dbt/include/doris/macros/adapters/relation.sql
index b84e5d529e..c60201c51b 100644
--- a/extension/dbt-doris/dbt/include/doris/macros/adapters/relation.sql
+++ b/extension/dbt-doris/dbt/include/doris/macros/adapters/relation.sql
@@ -81,7 +81,7 @@
{%- endmacro %}
{% macro doris__properties() -%}
- {% set properties = config.get('properties', validator=validation.any[dict]) or {"replication_num":"1"} %}
+ {% set properties = config.get('properties', validator=validation.any[dict]) %}
{% if properties is not none %}
PROPERTIES (
{% for key, value in properties.items() %}
diff --git a/extension/dbt-doris/dbt/include/doris/macros/materializations/incremental/help.sql b/extension/dbt-doris/dbt/include/doris/macros/materializations/incremental/help.sql
index 01dd9e5ffb..be8af0bb23 100644
--- a/extension/dbt-doris/dbt/include/doris/macros/materializations/incremental/help.sql
+++ b/extension/dbt-doris/dbt/include/doris/macros/materializations/incremental/help.sql
@@ -56,7 +56,12 @@
show create table {{ target_relation }}
{%- endmacro %}
-{% macro is_unique_model( table_create_obj ) %}
+{% macro is_unique_model( target_relation ) %}
+ {% set build_show_create = show_create( target_relation, statement_name='table_model') %}
+ {% call statement('table_model' , fetch_result=True) %}
+ {{ build_show_create }}
+ {% endcall %}
+ {%- set table_create_obj = load_result('table_model') -%}
{% set create_table = table_create_obj['data'][0][1]%}
{{ return('\nUNIQUE KEY(' in create_table and '\nDUPLICATE KEY(' not in create_table and '\nAGGREGATE KEY(' not in create_table) }}
{%- endmacro %}
diff --git a/extension/dbt-doris/dbt/include/doris/macros/materializations/incremental/incremental.sql b/extension/dbt-doris/dbt/include/doris/macros/materializations/incremental/incremental.sql
index 94f06444de..f15a9d0896 100644
--- a/extension/dbt-doris/dbt/include/doris/macros/materializations/incremental/incremental.sql
+++ b/extension/dbt-doris/dbt/include/doris/macros/materializations/incremental/incremental.sql
@@ -17,56 +17,71 @@
{% materialization incremental, adapter='doris' %}
{% set unique_key = config.get('unique_key', validator=validation.any[list]) %}
- {%- set inserts_only = config.get('inserts_only') -%}
-
+ {% set strategy = dbt_doris_validate_get_incremental_strategy(config) %}
+ {% set full_refresh_mode = (should_full_refresh()) %}
{% set target_relation = this.incorporate(type='table') %}
-
-
{% set existing_relation = load_relation(this) %}
{% set tmp_relation = make_temp_relation(this) %}
-
{{ run_hooks(pre_hooks, inside_transaction=False) }}
-
{{ run_hooks(pre_hooks, inside_transaction=True) }}
-
{% set to_drop = [] %}
+ {#-- append or no unique key --#}
- {% if unique_key is none or inserts_only %}
- {% set build_sql = tmp_insert(tmp_relation, target_relation, unique_key=none) %}
- {% elif existing_relation is none %}
- {% set build_sql = doris__create_unique_table_as(False, target_relation, sql) %}
- {% elif existing_relation.is_view or should_full_refresh() %}
- {#-- Make sure the backup doesn't exist so we don't encounter issues with the rename below #}
- {% set backup_identifier = existing_relation.identifier ~ "__dbt_backup" %}
- {% set backup_relation = existing_relation.incorporate(path={"identifier": backup_identifier}) %}
- {% do adapter.drop_relation(backup_relation) %}
- {% do adapter.rename_relation(target_relation, backup_relation) %}
- {% set build_sql = doris__create_unique_table_as(False, target_relation, sql) %}
- {% do to_drop.append(backup_relation) %}
- {% else %}
- {% set build_show_create = show_create( target_relation, statement_name="table_model") %}
- {% call statement('table_model' , fetch_result=True) %}
- {{ build_show_create }}
- {% endcall %}
- {%- set table_create_obj = load_result('table_model') -%}
- {% if not is_unique_model(table_create_obj) %}
- {% do exceptions.raise_compiler_error("doris table:"~ target_relation ~ ", model must be 'UNIQUE'" ) %}
- {% endif %}
- {% do run_query(create_table_as(True, tmp_relation, sql)) %}
- {% do to_drop.append(tmp_relation) %}
- {% do adapter.expand_target_column_types(
- from_relation=tmp_relation,
- to_relation=target_relation) %}
- {% set build_sql = tmp_insert(tmp_relation, target_relation, unique_key=unique_key) %}
+ {% if unique_key is none or strategy == 'append' %}
+ {#-- create table first --#}
+ {% if existing_relation is none %}
+ {% set build_sql = doris__create_table_as(False, target_relation, sql) %}
+ {% elif existing_relation.is_view or full_refresh_mode %}
+ {#-- backup data before drop old table #}
+ {% set backup_identifier = existing_relation.identifier ~ "__dbt_backup" %}
+ {% set backup_relation = existing_relation.incorporate(path={"identifier": backup_identifier}) %}
+ {% do adapter.drop_relation(backup_relation) %} {#-- likes 'drop table if exists ... ' --#}
+ {% do adapter.rename_relation(target_relation, backup_relation) %}
+ {% set build_sql = doris__create_table_as(False, target_relation, sql) %}
+ {% do to_drop.append(backup_relation) %}
+ {#-- append data --#}
+ {% else %}
+ {% do run_query(create_table_as(True, tmp_relation, sql)) %}
+ {% set build_sql = tmp_insert(tmp_relation, target_relation, unique_key=none) %}
+ {% endif %}
+ {#-- insert overwrite --#}
+ {% elif strategy == 'insert_overwrite' %}
+ {#-- create table first --#}
+ {% if existing_relation is none %}
+ {% set build_sql = doris__create_unique_table_as(False, target_relation, sql) %}
+ {#-- insert data refresh --#}
+ {% elif existing_relation.is_view or full_refresh_mode %}
+ {#-- backup data before drop old table #}
+ {% set backup_identifier = existing_relation.identifier ~ "__dbt_backup" %}
+ {% set backup_relation = existing_relation.incorporate(path={"identifier": backup_identifier}) %}
+ {% do adapter.drop_relation(backup_relation) %} {#-- likes 'drop table if exists ... ' --#}
+ {% do adapter.rename_relation(target_relation, backup_relation) %}
+ {% set build_sql = doris__create_unique_table_as(False, target_relation, sql) %}
+ {% do to_drop.append(backup_relation) %}
+ {#-- append data --#}
+ {% else %}
+ {#-- check doris unique table --#}
+ {% if not is_unique_model(target_relation) %}
+ {% do exceptions.raise_compiler_error("doris table:"~ target_relation ~ ", model must be 'UNIQUE'" ) %}
+ {% endif %}
+ {#-- create temp duplicate table for this incremental task --#}
+ {% do run_query(create_table_as(True, tmp_relation, sql)) %}
+ {% do to_drop.append(tmp_relation) %}
+ {% do adapter.expand_target_column_types(
+ from_relation=tmp_relation,
+ to_relation=target_relation) %}
+ {% set build_sql = tmp_insert(tmp_relation, target_relation, unique_key=unique_key) %}
+ {% endif %}
+ {% else %}
+ {#-- never --#}
{% endif %}
{% call statement("main") %}
{{ build_sql }}
{% endcall %}
-
- {% do persist_docs(target_relation, model) %}
+ {#-- {% do persist_docs(target_relation, model) %} #}
{{ run_hooks(post_hooks, inside_transaction=True) }}
{% do adapter.commit() %}
{% for rel in to_drop %}
@@ -74,5 +89,17 @@
{% endfor %}
{{ run_hooks(post_hooks, inside_transaction=False) }}
{{ return({'relations': [target_relation]}) }}
-
{%- endmaterialization %}
+
+{% macro dbt_doris_validate_get_incremental_strategy(config) %}
+ {#-- Find and validate the incremental strategy #}
+ {%- set strategy = config.get('incremental_strategy') or 'insert_overwrite' -%}
+ {% set invalid_strategy_msg -%}
+ Invalid incremental strategy provided: {{ strategy }}
+ Expected one of: 'append', 'insert_overwrite'
+ {%- endset %}
+ {% if strategy not in ['append', 'insert_overwrite'] %}
+ {% do exceptions.raise_compiler_error(invalid_strategy_msg) %}
+ {% endif %}
+ {% do return (strategy) %}
+{% endmacro %}
diff --git a/extension/dbt-doris/setup.py b/extension/dbt-doris/setup.py
index 41658c2624..0f515fe953 100644
--- a/extension/dbt-doris/setup.py
+++ b/extension/dbt-doris/setup.py
@@ -22,7 +22,7 @@ from setuptools import find_namespace_packages, setup
package_name = "dbt-doris"
# make sure this always matches dbt/adapters/{adapter}/__version__.py
-package_version = "1.3.0"
+package_version = "0.2.1"
dbt_core_version = "1.3.0"
description = """The doris adapter plugin for dbt """
@@ -39,6 +39,7 @@ setup(
install_requires=[
"dbt-core~={}".format(dbt_core_version),
"mysql-connector-python>=8.0.0,<8.1",
+ "urllib3~=1.0",
],
- python_requires=">=3.8,<=3.10",
+ python_requires=">=3.7.2",
)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org