You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by "Roberto Garcia (Jira)" <ji...@apache.org> on 2020/09/20 06:06:00 UTC

[jira] [Comment Edited] (NIFI-7820) How to connect to controller service DBCP connection pool and execute the sql using that connection via python

    [ https://issues.apache.org/jira/browse/NIFI-7820?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17198908#comment-17198908 ] 

Roberto Garcia edited comment on NIFI-7820 at 9/20/20, 6:05 AM:
----------------------------------------------------------------

 

tested in Python 3.8.5, nifi-1.12.0, MySQL 8.0.21

!image-2020-09-20-01-44-32-117.png|width=1088,height=430!

 

!image-2020-09-20-01-58-03-615.png|width=696,height=491!

 

!image-2020-09-20-01-45-19-739.png|width=646,height=452!
h3. */home/sample/tmp/sample.py*

 
{code:java}
#!/usr/bin/env python3

import re
import sys
import string
import mysql.connector
from  mysql.connector import errorcode

config = {
  'user':'sample',
  'password':'myStrongPassword',
  'host':'127.0.0.1',
  'database':'sample'
}

try:
  cnx = mysql.connector.connect(**config)
except mysql.connector.Erro as err:
  if err.errno == errorcode.ER_ACCESS_DENIED_ERROR:
     print("Something is wrong with your user name or password")
  elif err.errno == errorcode.ER_BAD_DB_ERROR:
     print("Database does not exist")
  else:
     print(err)

cursor = cnx.cursor()

query = (
    "SELECT REPLACE(REPLACE(REPLACE(REPLACE(column_type,'enum',''),')',''),'(',''),'\\'','') enums "
    "FROM INFORMATION_SCHEMA.COLUMNS "
    "WHERE table_name='sample_with_enum' AND column_name='gender'")

cursor.execute(query)

row = str(cursor.fetchone()[0])
cursor.close()
cnx.close()

flowfile=sys.stdin.readline()

match = re.search('(?<={"id":2,"name":"gender","column_type":1,"value":)(.*?)(?=})', flowfile, re.DOTALL)
if match:
    index = int(match.group(1)) -1

enums = row.split(',')

newFlowfile = re.sub('(?<={"id":2,"name":"gender","column_type":1,"value":)(.*?)(?=})', f'"{enums[index]}"', flowfile, 0, re.DOTALL)
print(newFlowfile)

{code}
 

 

 

 

 


was (Author: robertogarcia):
 

tested in Python 3.8.5, nifi-1.12.0, MySQL 8.0.21

!image-2020-09-20-01-44-32-117.png!

 

!image-2020-09-20-01-58-03-615.png!

 

!image-2020-09-20-01-45-19-739.png!
h3. */home/sample/tmp/sample.py*

 
{code:java}
#!/usr/bin/env python3

import re
import sys
import string
import mysql.connector
from  mysql.connector import errorcode

config = {
  'user':'sample',
  'password':'myStrongPassword',
  'host':'127.0.0.1',
  'database':'sample'
}

try:
  cnx = mysql.connector.connect(**config)
except mysql.connector.Erro as err:
  if err.errno == errorcode.ER_ACCESS_DENIED_ERROR:
     print("Something is wrong with your user name or password")
  elif err.errno == errorcode.ER_BAD_DB_ERROR:
     print("Database does not exist")
  else:
     print(err)

cursor = cnx.cursor()

query = (
    "SELECT REPLACE(REPLACE(REPLACE(REPLACE(column_type,'enum',''),')',''),'(',''),'\\'','') enums "
    "FROM INFORMATION_SCHEMA.COLUMNS "
    "WHERE table_name='sample_with_enum' AND column_name='gender'")

cursor.execute(query)

row = str(cursor.fetchone()[0])
cursor.close()
cnx.close()

flowfile=sys.stdin.readline()

match = re.search('(?<={"id":2,"name":"gender","column_type":1,"value":)(.*?)(?=})', flowfile, re.DOTALL)
if match:
    index = int(match.group(1)) -1

enums = row.split(',')

newFlowfile = re.sub('(?<={"id":2,"name":"gender","column_type":1,"value":)(.*?)(?=})', f'"{enums[index]}"', flowfile, 0, re.DOTALL)
print(newFlowfile)

{code}
 

 

 

 

 

> How to connect to controller service DBCP connection pool and execute the sql using that connection via python 
> ---------------------------------------------------------------------------------------------------------------
>
>                 Key: NIFI-7820
>                 URL: https://issues.apache.org/jira/browse/NIFI-7820
>             Project: Apache NiFi
>          Issue Type: Task
>          Components: NiFi Stateless
>            Reporter: zeyk
>            Priority: Major
>         Attachments: enumsWithPython.xml, execute_script.py, image-2020-09-20-01-44-32-117.png, image-2020-09-20-01-45-19-739.png, image-2020-09-20-01-58-03-615.png
>
>
> I have a python code updated with my own logic to replace the index of enums with values, but in order to do so , i need to execute certain sql commands to get some values and the best thing would be to get the dbcp connection pool connection from nifi controller services and execute the commands , but i dont know to how to implement that in python, since few modules such as pymysql doesnt support in Nifi(Jython) ,Please find the code below:
> Any kind of help would be highly appreciated. Thanks in advance
>  
> {color:#89ddff}import{color}{color:#eeffff} json{color}
> {color:#89ddff}import{color}{color:#eeffff} re{color}
> {color:#89ddff}import{color}{color:#eeffff} sys{color}
> {color:#89ddff}import{color}{color:#eeffff} traceback{color}
> {color:#89ddff}from{color}{color:#eeffff} java.nio.charset {color}{color:#89ddff}import{color}{color:#eeffff} StandardCharsets{color}
> {color:#89ddff}from{color}{color:#eeffff} org.apache.commons.io {color}{color:#89ddff}import{color}{color:#eeffff} IOUtils{color}
> {color:#89ddff}from{color}{color:#eeffff} org.apache.nifi.processor.io {color}{color:#89ddff}import{color}{color:#eeffff} StreamCallback{color}
> {color:#89ddff}from{color}{color:#eeffff} org.python.core.util {color}{color:#89ddff}import{color}{color:#eeffff} StringUtil{color}
> {color:#c792ea}class{color}{color:#eeffff} {color}{color:#ffcb6b}TransformCallback{color}{color:#89ddff}({color}{color:#c3e88d}StreamCallback{color}{color:#89ddff}):{color}
> {color:#eeffff}    {color}{color:#c792ea}def{color}{color:#eeffff} {color}{color:#82aaff}__init__{color}{color:#89ddff}({color}{color:#ff5370}self{color}{color:#89ddff}):{color}
> {color:#eeffff}        {color}{color:#89ddff}pass{color}
> {color:#eeffff}    {color}{color:#c792ea}def{color}{color:#eeffff} {color}{color:#82aaff}process{color}{color:#89ddff}({color}{color:#ff5370}self{color}{color:#89ddff},{color}{color:#eeffff} {color}{color:#ff5370}inputStream{color}{color:#89ddff},{color}{color:#eeffff} {color}{color:#ff5370}outputStream{color}{color:#89ddff}):{color}
> {color:#eeffff}        {color}{color:#89ddff}try{color}{color:#eeffff}:{color}
> {color:#89ddff}            {color}{color:#546e7a}# Read input FlowFile content{color}
> {color:#eeffff}            input_text {color}{color:#c792ea}={color}{color:#eeffff} IOUtils.toString{color}{color:#89ddff}({color}{color:#eeffff}inputStream, StandardCharsets.UTF_8{color}{color:#89ddff}){color}
> {color:#eeffff}            input_obj {color}{color:#c792ea}={color}{color:#eeffff} json.loads{color}{color:#89ddff}({color}{color:#eeffff}input_text{color}{color:#89ddff}){color}
> {color:#eeffff} {color}
> {color:#eeffff}            {color}
> {color:#eeffff}            {color}{color:#eeffff}table_name {color}{color:#c792ea}={color}{color:#eeffff} input_obj{color}{color:#89ddff}[{color}{color:#89ddff}'{color}{color:#c3e88d}table_name{color}{color:#89ddff}'{color}{color:#89ddff}]{color}
> {color:#eeffff}            column_name {color}{color:#c792ea}={color}{color:#eeffff}  {color}{color:#89ddff}"{color}{color:#f78c6c}SELECT{color}{color:#c3e88d} column_name {color}{color:#f78c6c}FROM{color}{color:#c3e88d} INFORMATION_SCHEMA.COLUMNS {color}{color:#f78c6c}WHERE{color}{color:#c3e88d} table_name{color}{color:#c792ea}={color}{color:#89ddff}'{color}{color:#c3e88d}"+table_name+"' {color}{color:#f78c6c}AND{color}{color:#c3e88d} data_type{color}{color:#c792ea}={color}{color:#89ddff}'{color}{color:#c3e88d}enum'{color}{color:#89ddff}"{color}
> {color:#eeffff}            enum_value_sql {color}{color:#c792ea}={color}{color:#eeffff} {color}{color:#89ddff}"{color}{color:#f78c6c}SELECT{color}{color:#c3e88d} {color}{color:#82aaff}REPLACE{color}{color:#c3e88d}({color}{color:#82aaff}REPLACE{color}{color:#c3e88d}({color}{color:#82aaff}REPLACE{color}{color:#c3e88d}({color}{color:#82aaff}REPLACE{color}{color:#c3e88d}(column_type,{color}{color:#89ddff}'{color}{color:#c3e88d}enum',{color}{color:#89ddff}'{color}{color:#c3e88d}'),{color}{color:#89ddff}'{color}{color:#c3e88d})',{color}{color:#89ddff}'{color}{color:#c3e88d}'),{color}{color:#89ddff}'{color}{color:#c3e88d}(',{color}{color:#89ddff}'{color}{color:#c3e88d}'),{color}{color:#89ddff}'{color}{color:#c3e88d}\'',{color}{color:#89ddff}'{color}{color:#c3e88d}') enums {color}{color:#f78c6c}FROM{color}{color:#c3e88d} INFORMATION_SCHEMA.COLUMNS {color}{color:#f78c6c}WHERE{color}{color:#c3e88d} table_name{color}{color:#c792ea}={color}{color:#89ddff}'{color}{color:#c3e88d}"+table_name+"' {color}{color:#f78c6c}AND{color}{color:#c3e88d} column_name{color}{color:#c792ea}={color}{color:#89ddff}'{color}{color:#c3e88d}"+column_name+"'{color}{color:#89ddff}"{color}
> {color:#eeffff}            enum_value {color}{color:#c792ea}={color}{color:#eeffff} enum_value_sql.split{color}{color:#89ddff}({color}{color:#89ddff}'{color}{color:#c3e88d},{color}{color:#89ddff}'{color}{color:#89ddff}){color}
> {color:#eeffff}            {color}{color:#89ddff}for{color}{color:#eeffff} col {color}{color:#c792ea}in{color}{color:#eeffff} input_obj{color}{color:#89ddff}[{color}{color:#89ddff}'{color}{color:#c3e88d}columns{color}{color:#89ddff}'{color}{color:#89ddff}]{color}{color:#eeffff}:{color}
> {color:#eeffff}                {color}{color:#89ddff}if{color}{color:#eeffff} col{color}{color:#89ddff}[{color}{color:#89ddff}'{color}{color:#c3e88d}name{color}{color:#89ddff}'{color}{color:#89ddff}]{color}{color:#eeffff} {color}{color:#c792ea}=={color}{color:#eeffff} {color}{color:#b2ccd6}str{color}{color:#89ddff}({color}{color:#eeffff}column_name{color}{color:#89ddff}){color}{color:#eeffff}:{color}
> {color:#eeffff}                    col{color}{color:#89ddff}[{color}{color:#89ddff}'{color}{color:#c3e88d}value{color}{color:#89ddff}'{color}{color:#89ddff}]{color}{color:#eeffff} {color}{color:#c792ea}={color}{color:#eeffff} enum_value{color}{color:#89ddff}[{color}{color:#b2ccd6}int{color}{color:#89ddff}({color}{color:#eeffff}col{color}{color:#89ddff}[{color}{color:#89ddff}'{color}{color:#c3e88d}value{color}{color:#89ddff}'{color}{color:#89ddff}]){color}{color:#eeffff} {color}{color:#c792ea}-{color}{color:#eeffff} {color}{color:#f78c6c}1{color}{color:#89ddff}]{color}
> {color:#eeffff}            output_text {color}{color:#c792ea}={color}{color:#eeffff} json.dumps{color}{color:#89ddff}({color}{color:#eeffff}input_obj{color}{color:#89ddff}){color}
> {color:#eeffff}            outputStream.write{color}{color:#89ddff}({color}{color:#eeffff}StringUtil.toBytes{color}{color:#89ddff}({color}{color:#eeffff}output_text{color}{color:#89ddff})){color}
> {color:#eeffff}        {color}{color:#89ddff}except{color}{color:#eeffff}:{color}
> {color:#eeffff}            traceback.print_exc{color}{color:#89ddff}({color}{color:#ff5370}file{color}{color:#c792ea}={color}{color:#eeffff}sys.stdout{color}{color:#89ddff}){color}
> {color:#eeffff}            {color}{color:#89ddff}raise{color}
> {color:#eeffff}flowFile {color}{color:#c792ea}={color}{color:#eeffff} session.get{color}{color:#89ddff}(){color}
> {color:#89ddff}if{color}{color:#eeffff} flowFile {color}{color:#c792ea}!={color}{color:#eeffff} {color}{color:#f78c6c}None{color}{color:#eeffff}:{color}
> {color:#eeffff}    flowFile {color}{color:#c792ea}={color}{color:#eeffff} session.write{color}{color:#89ddff}({color}{color:#eeffff}flowFile, TransformCallback{color}{color:#89ddff}()){color}
> {color:#89ddff}    {color}{color:#546e7a}# Finish by transferring the FlowFile to an output relationship{color}
> {color:#eeffff}session.transfer{color}{color:#89ddff}({color}{color:#eeffff}flowFile, REL_SUCCESS{color}{color:#89ddff}){color}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)