You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by zy...@apache.org on 2023/06/08 17:41:45 UTC

[doris] branch master updated: [fix](dbt) dbt incremental append (#20513)

This is an automated email from the ASF dual-hosted git repository.

zykkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new e1184bf4dc [fix](dbt) dbt incremental append (#20513)
e1184bf4dc is described below

commit e1184bf4dc6f513e23fcbc9605d897c101e7dff6
Author: catpineapple <42...@users.noreply.github.com>
AuthorDate: Fri Jun 9 01:41:33 2023 +0800

    [fix](dbt) dbt incremental append (#20513)
---
 .../dbt-doris/dbt/adapters/doris/__version__.py    |   2 +-
 .../dbt-doris/dbt/include/doris/dbt_project.yml    |   2 +-
 .../dbt/include/doris/macros/adapters/relation.sql |   2 +-
 .../macros/materializations/incremental/help.sql   |   7 +-
 .../materializations/incremental/incremental.sql   | 101 +++++++++++++--------
 extension/dbt-doris/setup.py                       |   5 +-
 6 files changed, 76 insertions(+), 43 deletions(-)

diff --git a/extension/dbt-doris/dbt/adapters/doris/__version__.py b/extension/dbt-doris/dbt/adapters/doris/__version__.py
index e7da78ed97..201fc2407f 100644
--- a/extension/dbt-doris/dbt/adapters/doris/__version__.py
+++ b/extension/dbt-doris/dbt/adapters/doris/__version__.py
@@ -22,4 +22,4 @@
 # this 'version' must be set !!!
 # otherwise the adapters will not be found after the 'dbt init xxx' command 
 
-version = "1.3.0"
\ No newline at end of file
+version = "0.2.1"
diff --git a/extension/dbt-doris/dbt/include/doris/dbt_project.yml b/extension/dbt-doris/dbt/include/doris/dbt_project.yml
index a0518da8df..1cd7e916a8 100644
--- a/extension/dbt-doris/dbt/include/doris/dbt_project.yml
+++ b/extension/dbt-doris/dbt/include/doris/dbt_project.yml
@@ -19,7 +19,7 @@
 # under the License.
 
 name: dbt_doris
-version: 1.3.0
+version: 0.2.1
 config-version: 2
 
 macro-paths: ["macros"]
diff --git a/extension/dbt-doris/dbt/include/doris/macros/adapters/relation.sql b/extension/dbt-doris/dbt/include/doris/macros/adapters/relation.sql
index b84e5d529e..c60201c51b 100644
--- a/extension/dbt-doris/dbt/include/doris/macros/adapters/relation.sql
+++ b/extension/dbt-doris/dbt/include/doris/macros/adapters/relation.sql
@@ -81,7 +81,7 @@
 {%- endmacro %}
 
 {% macro doris__properties() -%}
-  {% set properties = config.get('properties', validator=validation.any[dict]) or {"replication_num":"1"} %}
+  {% set properties = config.get('properties', validator=validation.any[dict]) %}
   {% if properties is not none %}
     PROPERTIES (
         {% for key, value in properties.items() %}
diff --git a/extension/dbt-doris/dbt/include/doris/macros/materializations/incremental/help.sql b/extension/dbt-doris/dbt/include/doris/macros/materializations/incremental/help.sql
index 01dd9e5ffb..be8af0bb23 100644
--- a/extension/dbt-doris/dbt/include/doris/macros/materializations/incremental/help.sql
+++ b/extension/dbt-doris/dbt/include/doris/macros/materializations/incremental/help.sql
@@ -56,7 +56,12 @@
     show create table {{ target_relation }}
 {%- endmacro %}
 
-{% macro is_unique_model( table_create_obj ) %}
+{% macro is_unique_model( target_relation ) %}
+    {% set build_show_create = show_create( target_relation, statement_name='table_model') %}
+    {% call statement('table_model' , fetch_result=True)  %}
+        {{ build_show_create }}
+    {% endcall %}
+    {%- set table_create_obj = load_result('table_model') -%}
     {% set create_table = table_create_obj['data'][0][1]%}
     {{ return('\nUNIQUE KEY(' in create_table  and '\nDUPLICATE KEY(' not in create_table and '\nAGGREGATE KEY(' not in create_table) }}
 {%- endmacro %}
diff --git a/extension/dbt-doris/dbt/include/doris/macros/materializations/incremental/incremental.sql b/extension/dbt-doris/dbt/include/doris/macros/materializations/incremental/incremental.sql
index 94f06444de..f15a9d0896 100644
--- a/extension/dbt-doris/dbt/include/doris/macros/materializations/incremental/incremental.sql
+++ b/extension/dbt-doris/dbt/include/doris/macros/materializations/incremental/incremental.sql
@@ -17,56 +17,71 @@
 
 {% materialization incremental, adapter='doris' %}
   {% set unique_key = config.get('unique_key', validator=validation.any[list]) %}
-  {%- set inserts_only = config.get('inserts_only') -%}
-
+  {% set strategy = dbt_doris_validate_get_incremental_strategy(config) %}
+  {% set full_refresh_mode = (should_full_refresh()) %}
   {% set target_relation = this.incorporate(type='table') %}
-
-
   {% set existing_relation = load_relation(this) %}
   {% set tmp_relation = make_temp_relation(this) %}
-
   {{ run_hooks(pre_hooks, inside_transaction=False) }}
-
   {{ run_hooks(pre_hooks, inside_transaction=True) }}
-
   {% set to_drop = [] %}
+  {#-- append or no unique key --#}
 
-  {% if unique_key is none or inserts_only  %}
-        {% set build_sql = tmp_insert(tmp_relation, target_relation, unique_key=none) %}
-  {% elif existing_relation is none %}
-      {% set build_sql = doris__create_unique_table_as(False, target_relation, sql) %}
-  {% elif existing_relation.is_view or should_full_refresh() %}
-      {#-- Make sure the backup doesn't exist so we don't encounter issues with the rename below #}
-      {% set backup_identifier = existing_relation.identifier ~ "__dbt_backup" %}
-      {% set backup_relation = existing_relation.incorporate(path={"identifier": backup_identifier}) %}
-      {% do adapter.drop_relation(backup_relation) %}
-      {% do adapter.rename_relation(target_relation, backup_relation) %}
-      {% set build_sql = doris__create_unique_table_as(False, target_relation, sql) %}
-      {% do to_drop.append(backup_relation) %}
-  {% else %}
-      {% set build_show_create = show_create( target_relation, statement_name="table_model") %}
-        {% call statement('table_model' , fetch_result=True)  %}
-            {{ build_show_create }}
-        {% endcall %}
-      {%- set table_create_obj = load_result('table_model') -%}
-      {% if not is_unique_model(table_create_obj) %}
-            {% do exceptions.raise_compiler_error("doris table:"~ target_relation ~ ", model must be 'UNIQUE'" ) %}
-      {% endif %}
-      {% do run_query(create_table_as(True, tmp_relation, sql)) %}
-      {% do to_drop.append(tmp_relation) %}
 
-      {% do adapter.expand_target_column_types(
-             from_relation=tmp_relation,
-             to_relation=target_relation) %}
-      {% set build_sql = tmp_insert(tmp_relation, target_relation, unique_key=unique_key) %}
+  {% if unique_key is none or strategy == 'append'  %}
+        {#-- create table first --#}
+        {% if existing_relation is none  %}
+            {% set build_sql = doris__create_table_as(False, target_relation, sql) %}
+        {% elif existing_relation.is_view or full_refresh_mode %}
+            {#-- backup data before drop old table #}
+            {% set backup_identifier = existing_relation.identifier ~ "__dbt_backup" %}
+            {% set backup_relation = existing_relation.incorporate(path={"identifier": backup_identifier}) %}
+            {% do adapter.drop_relation(backup_relation) %} {#-- likes 'drop table if exists ... ' --#}
+            {% do adapter.rename_relation(target_relation, backup_relation) %}
+            {% set build_sql = doris__create_table_as(False, target_relation, sql) %}
+            {% do to_drop.append(backup_relation) %}
+        {#-- append data --#}
+        {% else %}
+            {% do run_query(create_table_as(True, tmp_relation, sql)) %}
+            {% set build_sql = tmp_insert(tmp_relation, target_relation, unique_key=none) %}
+        {% endif %}
+  {#-- insert overwrite --#}
+  {% elif strategy == 'insert_overwrite' %}
+        {#-- create table first --#}
+        {% if existing_relation is none  %}
+            {% set build_sql = doris__create_unique_table_as(False, target_relation, sql) %}
+        {#-- insert data refresh --#}
+        {% elif existing_relation.is_view or full_refresh_mode %}
+            {#-- backup data before drop old table #}
+            {% set backup_identifier = existing_relation.identifier ~ "__dbt_backup" %}
+            {% set backup_relation = existing_relation.incorporate(path={"identifier": backup_identifier}) %}
+            {% do adapter.drop_relation(backup_relation) %} {#-- likes 'drop table if exists ... ' --#}
+            {% do adapter.rename_relation(target_relation, backup_relation) %}
+            {% set build_sql = doris__create_unique_table_as(False, target_relation, sql) %}
+            {% do to_drop.append(backup_relation) %}
+        {#-- append data --#}
+        {% else %}
+          {#-- check doris unique table  --#}
+          {% if not is_unique_model(target_relation) %}
+                {% do exceptions.raise_compiler_error("doris table:"~ target_relation ~ ", model must be 'UNIQUE'" ) %}
+          {% endif %}
+          {#-- create temp duplicate table for this incremental task  --#}
+          {% do run_query(create_table_as(True, tmp_relation, sql)) %}
+          {% do to_drop.append(tmp_relation) %}
+          {% do adapter.expand_target_column_types(
+                 from_relation=tmp_relation,
+                 to_relation=target_relation) %}
+          {% set build_sql = tmp_insert(tmp_relation, target_relation, unique_key=unique_key) %}
+        {% endif %}
+  {% else %}
+          {#-- never  --#}
   {% endif %}
 
   {% call statement("main") %}
       {{ build_sql }}
   {% endcall %}
 
-
-  {% do persist_docs(target_relation, model) %}
+  {#--  {% do persist_docs(target_relation, model) %}  #}
   {{ run_hooks(post_hooks, inside_transaction=True) }}
   {% do adapter.commit() %}
   {% for rel in to_drop %}
@@ -74,5 +89,17 @@
   {% endfor %}
   {{ run_hooks(post_hooks, inside_transaction=False) }}
   {{ return({'relations': [target_relation]}) }}
-
 {%- endmaterialization %}
+
+{% macro dbt_doris_validate_get_incremental_strategy(config) %}
+  {#-- Find and validate the incremental strategy #}
+  {%- set strategy = config.get('incremental_strategy') or 'insert_overwrite' -%}
+  {% set invalid_strategy_msg -%}
+    Invalid incremental strategy provided: {{ strategy }}
+    Expected one of: 'append', 'insert_overwrite'
+  {%- endset %}
+  {% if strategy not in ['append', 'insert_overwrite'] %}
+    {% do exceptions.raise_compiler_error(invalid_strategy_msg) %}
+  {% endif %}
+  {% do return (strategy) %}
+{% endmacro %}
diff --git a/extension/dbt-doris/setup.py b/extension/dbt-doris/setup.py
index 41658c2624..0f515fe953 100644
--- a/extension/dbt-doris/setup.py
+++ b/extension/dbt-doris/setup.py
@@ -22,7 +22,7 @@ from setuptools import find_namespace_packages, setup
 
 package_name = "dbt-doris"
 # make sure this always matches dbt/adapters/{adapter}/__version__.py
-package_version = "1.3.0"
+package_version = "0.2.1"
 dbt_core_version = "1.3.0"
 description = """The doris adapter plugin for dbt """
 
@@ -39,6 +39,7 @@ setup(
     install_requires=[
         "dbt-core~={}".format(dbt_core_version),
         "mysql-connector-python>=8.0.0,<8.1",
+        "urllib3~=1.0",
     ],
-    python_requires=">=3.8,<=3.10",
+    python_requires=">=3.7.2",
 )


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org