You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Marco Vieira (Jira)" <ji...@apache.org> on 2020/07/10 02:09:00 UTC

[jira] [Updated] (SPARK-32265) PySpark regexp_replace does not work as expected for the following pattern

     [ https://issues.apache.org/jira/browse/SPARK-32265?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Marco Vieira updated SPARK-32265:
---------------------------------
    Description: 
I'm using spark streaming to consume from a topic and make transformations on the data.
Amidst these is a regex replacement.
The `regexp_replace` function from `pyspark.sql.functions` is not replacing the following pattern (I tested it beforehand using regex101.com, `re` from python, etc):


{code:python}
df.withColumn('value', f.regexp_replace('value', '([A-Za-z]+=[^,]*?)(\[[A-Z,a-z,0-9]+\])',r'$1'))
{code}


this is a snippet of the record:
{code:hocon}
{someVersion=8.3.2-hmg-dev, someUnitName=IB, someMessage=Test. [BL056], someOrigin=MOBILE, someStatus=TEST, duration=3500, 
{code}

and This is the "target" of the regex pattern:
{code:hocon}
 someMessage=Test. [BL056]` 
{code}


It should match the entire target and split in two groups, and replace it by the first group matched alone (as by `r'$1'`).

These are also patterns that didn't work:
* ` df.withColumn('value', f.regexp_replace('value', '([A-Za-z]+=[^,]*?)',''))`
* ` df.withColumn('value', f.regexp_replace('value', '(\[[A-Z,a-z,0-9]+\])',''))`

This worked:
* ` df.withColumn('value', f.regexp_replace('value', 'someMessage=Test. [BL056]',''))`

Why is this happening? Are there specificities to the spark regex engine? 
What would be the right pattern for what I'm trying to do?

Examples and the entire script is listed below: 


This is an example value of the "value" column:

{code:hocon}
{someVersion=8.3.2-hmg-dev, someUnitName=IB, someMessage=Test. [BL056], someOrigin=MOBILE, someStatus=TEST, duration=3500, someNumber=9872329, someAppOrigin=APP_PADRAO, someId=c3ASAUSQTiWvl_YA9DYpDV:APA91bGfVcLNNGL20hfmaDDS0D8TuzJDuCjj4tgbRNcJcYASIBRVEE2FnA4exnE4ZWTuupRX7FQkdcJiMWkNEatk8lktkFcpR7P7mehb4r_SVnabIabGInjagGZ6pGyweDkxW2JUGK8g, someType=00001, someOriginOpen=null, someOS=null, eventSubType=TESTLOGON, someToken=, ip=error, somePair=0.4220043,-1.084015, eventType=SUCESSO, someMag=aWg4V01qSxDMjAvWmlEWGJ6aExnc2nZJbWZVPQ==, macAddress=33d94a3f7d2f8aff, someJSON=\{"ip":"error","hostname":null,"type":null,"concode":null,"continent":null,"country":null,"country_name":null,"code":null,"name":null,"city":null,"zip":null,"latitude":null,"longitude":null,"anotherJSON":{"id":null,"capital":null,"languages":null,"flag":null,"flag_emoji":null,"flag_emoji_unicode":null,"calling_code":null,"is_eu":null},"time_zone":\{"id":null,"current_time":null,"gmt_offset":null,"code":null,"is_daylight_saving":null},"currency":\{"code":null,"name":null,"plural":null,"symbol":null,"symbol_native":null},"connection":\{"asn":null,"isp":null},"security":\{"is_proxy":null,"proxy_type":null,"is_crawler":null,"crawler_name":null,"crawler_type":null,"is_tor":null,"threat_level":null,"threat_types":null}}, organization=IBPF, codigoCliente=440149, device=Android SDK built for x86, eventDate=6/1/20 4:03 PM}
{code}

This is the whole code: 

{code:python}

import re
import json
import pyhocon
import fastavro
import requests
from io import BytesIO
from pyspark.sql import SparkSession
from pyspark.sql import functions as f

spark = SparkSession.builder.getOrCreate()


def decode(msg, schema):
    bytes_io = BytesIO(msg)
    bytes_io.seek(5)
    msg = fastavro.schemaless_reader(bytes_io, schema)
    return msg


def parse(msg):
    conf = pyhocon.ConfigParser.parse(msg)
    msg_converter = pyhocon.tool.HOCONConverter.to_json(conf)
    msg = json.loads(msg_converter)
    return msg


def get_schema(registry_url,topic):
    URL = f'\{registry_url}/subjects/\{topic}/versions/latest'
    response = requests.get(url=URL, verify=False)
    subject = response.json()
    schema_id = subject['id']
    schema = json.loads(subject['schema'])
    return [schema_id, schema]


schema_id, schema = get_schema(registry_url=SCHEMA_REGISTRY,topic=SUBSCRIBE_TOPIC)
spark.udf.register('decode',lambda value: decode(value,schema))
spark.udf.register('parse',parse)
spark.readStream \
 .format('kafka') \
 .option('subscribe', SUBSCRIBE_TOPIC) \
 .option('startingOffsets', 'earliest') \
 .option('kafka.bootstrap.servers', HOST) \
 .option('kafka.security.protocol', 'SSL') \
 .option('kafka.ssl.key.password', KEYSTORE_PASSWORD) \
 .option('kafka.ssl.keystore.location', KEYSTORE_PATH) \
 .option('kafka.ssl.truststore.location', KEYSTORE_PATH) \
 .option('kafka.ssl.keystore.password', KEYSTORE_PASSWORD) \
 .option('kafka.ssl.truststore.password', KEYSTORE_PASSWORD) \
 .load() \
 .selectExpr(f'decode(value) as value') \
 .withColumn('value', f.regexp_replace('value', '([A-Za-z]+=[^,]*?)(\[[A-Z,a-z,1-9]+\])','$1'))\
 .writeStream \
 .format('console') \
 .option('truncate', 'false') \
 .start()
{code}

  was:
I'm using spark streaming to consume from a topic and make transformations on the data.
Amidst these is a regex replacement.
The `regexp_replace` function from `pyspark.sql.functions` is not replacing the following pattern (I tested it beforehand using regex101.com, `re` from python, etc):

` df.withColumn('value', f.regexp_replace('value', '([A-Za-z]+=[^,]*?)(\[[A-Z,a-z,0-9]+\])',r'$1'))`

this is a snippet of the record:
```
{someVersion=8.3.2-hmg-dev, someUnitName=IB, someMessage=Test. [BL056], someOrigin=MOBILE, someStatus=TEST, duration=3500, 
```

and This is the "target" of the regex pattern:
` someMessage=Test. [BL056]`

It should match the entire target and split in two groups, and replace it by the first group matched alone (as by `r'$1'`).

These are also patterns that didn't work:
# ` df.withColumn('value', f.regexp_replace('value', '([A-Za-z]+=[^,]*?)',''))`
# ` df.withColumn('value', f.regexp_replace('value', '(\[[A-Z,a-z,0-9]+\])',''))`

This worked:
# ` df.withColumn('value', f.regexp_replace('value', 'someMessage=Test. [BL056]',''))`

Why is this happening? Are there specificities to the spark regex engine? 
What would be the right pattern for what I'm trying to do?

Examples and the entire script is listed below: 


This is an example value of the "value" column:

```hocon

{someVersion=8.3.2-hmg-dev, someUnitName=IB, someMessage=Test. [BL056], someOrigin=MOBILE, someStatus=TEST, duration=3500, someNumber=9872329, someAppOrigin=APP_PADRAO, someId=c3ASAUSQTiWvl_YA9DYpDV:APA91bGfVcLNNGL20hfmaDDS0D8TuzJDuCjj4tgbRNcJcYASIBRVEE2FnA4exnE4ZWTuupRX7FQkdcJiMWkNEatk8lktkFcpR7P7mehb4r_SVnabIabGInjagGZ6pGyweDkxW2JUGK8g, someType=00001, someOriginOpen=null, someOS=null, eventSubType=TESTLOGON, someToken=, ip=error, somePair=0.4220043,-1.084015, eventType=SUCESSO, someMag=aWg4V01qSxDMjAvWmlEWGJ6aExnc2nZJbWZVPQ==, macAddress=33d94a3f7d2f8aff, someJSON=\{"ip":"error","hostname":null,"type":null,"concode":null,"continent":null,"country":null,"country_name":null,"code":null,"name":null,"city":null,"zip":null,"latitude":null,"longitude":null,"anotherJSON":{"id":null,"capital":null,"languages":null,"flag":null,"flag_emoji":null,"flag_emoji_unicode":null,"calling_code":null,"is_eu":null},"time_zone":\{"id":null,"current_time":null,"gmt_offset":null,"code":null,"is_daylight_saving":null},"currency":\{"code":null,"name":null,"plural":null,"symbol":null,"symbol_native":null},"connection":\{"asn":null,"isp":null},"security":\{"is_proxy":null,"proxy_type":null,"is_crawler":null,"crawler_name":null,"crawler_type":null,"is_tor":null,"threat_level":null,"threat_types":null}}, organization=IBPF, codigoCliente=440149, device=Android SDK built for x86, eventDate=6/1/20 4:03 PM}
```

This is the whole code: 

```python

import re
import json
import pyhocon
import fastavro
import requests
from io import BytesIO
from pyspark.sql import SparkSession
from pyspark.sql import functions as f

spark = SparkSession.builder.getOrCreate()


def decode(msg, schema):
    bytes_io = BytesIO(msg)
    bytes_io.seek(5)
    msg = fastavro.schemaless_reader(bytes_io, schema)
    return msg


def parse(msg):
    conf = pyhocon.ConfigParser.parse(msg)
    msg_converter = pyhocon.tool.HOCONConverter.to_json(conf)
    msg = json.loads(msg_converter)
    return msg


def get_schema(registry_url,topic):
    URL = f'\{registry_url}/subjects/\{topic}/versions/latest'
    response = requests.get(url=URL, verify=False)
    subject = response.json()
    schema_id = subject['id']
    schema = json.loads(subject['schema'])
    return [schema_id, schema]


schema_id, schema = get_schema(registry_url=SCHEMA_REGISTRY,topic=SUBSCRIBE_TOPIC)
spark.udf.register('decode',lambda value: decode(value,schema))
spark.udf.register('parse',parse)
spark.readStream \
 .format('kafka') \
 .option('subscribe', SUBSCRIBE_TOPIC) \
 .option('startingOffsets', 'earliest') \
 .option('kafka.bootstrap.servers', HOST) \
 .option('kafka.security.protocol', 'SSL') \
 .option('kafka.ssl.key.password', KEYSTORE_PASSWORD) \
 .option('kafka.ssl.keystore.location', KEYSTORE_PATH) \
 .option('kafka.ssl.truststore.location', KEYSTORE_PATH) \
 .option('kafka.ssl.keystore.password', KEYSTORE_PASSWORD) \
 .option('kafka.ssl.truststore.password', KEYSTORE_PASSWORD) \
 .load() \
 .selectExpr(f'decode(value) as value') \
 .withColumn('value', f.regexp_replace('value', '([A-Za-z]+=[^,]*?)(\[[A-Z,a-z,1-9]+\])','$1'))\
 .writeStream \
 .format('console') \
 .option('truncate', 'false') \
 .start()
``` 



> PySpark regexp_replace does not work as expected for the following pattern
> --------------------------------------------------------------------------
>
>                 Key: SPARK-32265
>                 URL: https://issues.apache.org/jira/browse/SPARK-32265
>             Project: Spark
>          Issue Type: Question
>          Components: PySpark, Spark Core, SQL, Structured Streaming
>    Affects Versions: 2.4.4
>            Reporter: Marco Vieira
>            Priority: Major
>   Original Estimate: 2h
>  Remaining Estimate: 2h
>
> I'm using spark streaming to consume from a topic and make transformations on the data.
> Amidst these is a regex replacement.
> The `regexp_replace` function from `pyspark.sql.functions` is not replacing the following pattern (I tested it beforehand using regex101.com, `re` from python, etc):
> {code:python}
> df.withColumn('value', f.regexp_replace('value', '([A-Za-z]+=[^,]*?)(\[[A-Z,a-z,0-9]+\])',r'$1'))
> {code}
> this is a snippet of the record:
> {code:hocon}
> {someVersion=8.3.2-hmg-dev, someUnitName=IB, someMessage=Test. [BL056], someOrigin=MOBILE, someStatus=TEST, duration=3500, 
> {code}
> and This is the "target" of the regex pattern:
> {code:hocon}
>  someMessage=Test. [BL056]` 
> {code}
> It should match the entire target and split in two groups, and replace it by the first group matched alone (as by `r'$1'`).
> These are also patterns that didn't work:
> * ` df.withColumn('value', f.regexp_replace('value', '([A-Za-z]+=[^,]*?)',''))`
> * ` df.withColumn('value', f.regexp_replace('value', '(\[[A-Z,a-z,0-9]+\])',''))`
> This worked:
> * ` df.withColumn('value', f.regexp_replace('value', 'someMessage=Test. [BL056]',''))`
> Why is this happening? Are there specificities to the spark regex engine? 
> What would be the right pattern for what I'm trying to do?
> Examples and the entire script is listed below: 
> This is an example value of the "value" column:
> {code:hocon}
> {someVersion=8.3.2-hmg-dev, someUnitName=IB, someMessage=Test. [BL056], someOrigin=MOBILE, someStatus=TEST, duration=3500, someNumber=9872329, someAppOrigin=APP_PADRAO, someId=c3ASAUSQTiWvl_YA9DYpDV:APA91bGfVcLNNGL20hfmaDDS0D8TuzJDuCjj4tgbRNcJcYASIBRVEE2FnA4exnE4ZWTuupRX7FQkdcJiMWkNEatk8lktkFcpR7P7mehb4r_SVnabIabGInjagGZ6pGyweDkxW2JUGK8g, someType=00001, someOriginOpen=null, someOS=null, eventSubType=TESTLOGON, someToken=, ip=error, somePair=0.4220043,-1.084015, eventType=SUCESSO, someMag=aWg4V01qSxDMjAvWmlEWGJ6aExnc2nZJbWZVPQ==, macAddress=33d94a3f7d2f8aff, someJSON=\{"ip":"error","hostname":null,"type":null,"concode":null,"continent":null,"country":null,"country_name":null,"code":null,"name":null,"city":null,"zip":null,"latitude":null,"longitude":null,"anotherJSON":{"id":null,"capital":null,"languages":null,"flag":null,"flag_emoji":null,"flag_emoji_unicode":null,"calling_code":null,"is_eu":null},"time_zone":\{"id":null,"current_time":null,"gmt_offset":null,"code":null,"is_daylight_saving":null},"currency":\{"code":null,"name":null,"plural":null,"symbol":null,"symbol_native":null},"connection":\{"asn":null,"isp":null},"security":\{"is_proxy":null,"proxy_type":null,"is_crawler":null,"crawler_name":null,"crawler_type":null,"is_tor":null,"threat_level":null,"threat_types":null}}, organization=IBPF, codigoCliente=440149, device=Android SDK built for x86, eventDate=6/1/20 4:03 PM}
> {code}
> This is the whole code: 
> {code:python}
> import re
> import json
> import pyhocon
> import fastavro
> import requests
> from io import BytesIO
> from pyspark.sql import SparkSession
> from pyspark.sql import functions as f
> spark = SparkSession.builder.getOrCreate()
> def decode(msg, schema):
>     bytes_io = BytesIO(msg)
>     bytes_io.seek(5)
>     msg = fastavro.schemaless_reader(bytes_io, schema)
>     return msg
> def parse(msg):
>     conf = pyhocon.ConfigParser.parse(msg)
>     msg_converter = pyhocon.tool.HOCONConverter.to_json(conf)
>     msg = json.loads(msg_converter)
>     return msg
> def get_schema(registry_url,topic):
>     URL = f'\{registry_url}/subjects/\{topic}/versions/latest'
>     response = requests.get(url=URL, verify=False)
>     subject = response.json()
>     schema_id = subject['id']
>     schema = json.loads(subject['schema'])
>     return [schema_id, schema]
> schema_id, schema = get_schema(registry_url=SCHEMA_REGISTRY,topic=SUBSCRIBE_TOPIC)
> spark.udf.register('decode',lambda value: decode(value,schema))
> spark.udf.register('parse',parse)
> spark.readStream \
>  .format('kafka') \
>  .option('subscribe', SUBSCRIBE_TOPIC) \
>  .option('startingOffsets', 'earliest') \
>  .option('kafka.bootstrap.servers', HOST) \
>  .option('kafka.security.protocol', 'SSL') \
>  .option('kafka.ssl.key.password', KEYSTORE_PASSWORD) \
>  .option('kafka.ssl.keystore.location', KEYSTORE_PATH) \
>  .option('kafka.ssl.truststore.location', KEYSTORE_PATH) \
>  .option('kafka.ssl.keystore.password', KEYSTORE_PASSWORD) \
>  .option('kafka.ssl.truststore.password', KEYSTORE_PASSWORD) \
>  .load() \
>  .selectExpr(f'decode(value) as value') \
>  .withColumn('value', f.regexp_replace('value', '([A-Za-z]+=[^,]*?)(\[[A-Z,a-z,1-9]+\])','$1'))\
>  .writeStream \
>  .format('console') \
>  .option('truncate', 'false') \
>  .start()
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org