You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Curtis Jensen <cu...@gmail.com> on 2022/11/29 23:25:17 UTC

"An illegal reflective access operation has occurred" during KeyedStream process

Hello,

Using Flink version 1.15.0, I recieve these warnings when trying a
small example (code below):
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by
org.apache.flink.api.java.ClosureCleaner
(file:/Users/cjensen/data/tools/apache-maven-3.3.9/m2/org/apache/flink/flink-core/1.15.0/flink-core-1.15.0.jar)
to field java.lang.String.value
WARNING: Please consider reporting this to the maintainers of
org.apache.flink.api.java.ClosureCleaner
WARNING: Use --illegal-access=warn to enable warnings of further
illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release

I am undoubtedly doing something incorrectly, but felt that it may be
useful to take the advice "Please consider reporting this to the
maintainers of org.apache.flink.api.java.ClosureCleaner".
Also, any corrections to my example would be appreciated.

Thanks,
Curtis




AvgAmount.java

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

public class AvgAmount {

  public static void main(String[] args) {
    StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
    env.setRuntimeMode(RuntimeExecutionMode.BATCH);

    DataStream<ExampleData.PurchaseEvent> purchaseStream =
env.fromElements(ExampleData.PURCHASE_EVENTS);
    KeyedStream keyedPurchaseStream = purchaseStream.keyBy(event ->
event.account_id);
    keyedPurchaseStream.process(new PurchaseEventProcessor())
            .map(stats -> stats.toString())
                    .print();
  }

  public static class PurchaseStats {
    public String accountId;
    public long amountSum;
    public long amountCount;

    public PurchaseStats(String accountId) {
      this.accountId = accountId;
    }

    public void addAmount(long amount) {
      amountSum += amount;
      amountCount += 1;
    }

    @Override
    public String toString() {
      return String.format("{\"account_id\":\"%s\",\"avg_amount\":%f}",
accountId, (double)amountSum/(double)amountCount);
    }
  }

  public static class PurchaseEventProcessor extends
KeyedProcessFunction<String, ExampleData.PurchaseEvent, PurchaseStats>
{
    ValueState<PurchaseStats> seen;

    @Override
    public void open(Configuration parameters) {
      seen = getRuntimeContext().getState(new
ValueStateDescriptor<>("seen", PurchaseStats.class));
    }

    @Override
    public void processElement(ExampleData.PurchaseEvent
purchaseEvent, KeyedProcessFunction<String, ExampleData.PurchaseEvent,
PurchaseStats>.Context context, Collector<PurchaseStats> out) throws
Exception {
      PurchaseStats currentStats = seen.value();
      if (currentStats == null) {
        currentStats = new PurchaseStats(purchaseEvent.account_id);
      }

      currentStats.addAmount(purchaseEvent.amount);

      seen.update(currentStats);
      out.collect(currentStats);
    }
  }
}

ExampleData.java

import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;

import java.time.Instant;

public class ExampleData {
    public static final PurchaseEvent[] PURCHASE_EVENTS =
            new PurchaseEvent[] {
                    new PurchaseEvent("1337Gamer", "192.168.0.1", 1000),
                    new PurchaseEvent("1337", "127.0.0.1", 1000),
                    new PurchaseEvent("1337", "127.0.0.2", 100),
                    new PurchaseEvent("1337", "127.0.0.1", 9900)
            };

    public static class PurchaseEvent {
        public long timestamp;
        public String account_id;
        public String ip;
        public long amount;

        public PurchaseEvent() { }

        public PurchaseEvent(String accountId, String ip, long amount) {
            this(Instant.now().getEpochSecond(), accountId, ip, amount);
        }

        public PurchaseEvent(long timestamp, String accountId, String
ip, long amount) {
            this.timestamp = timestamp;
            this.account_id = accountId;
            this.ip = ip;
            this.amount = amount;
        }
    }
}

Re: "An illegal reflective access operation has occurred" during KeyedStream process

Posted by Curtis Jensen <cu...@gmail.com>.
I changed the ".map(...)" and ".print()" terminal statement to :
.executeAndCollect()
.forEachRemaining(System.out::println);

The warnings were replaced with:
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by
com.twitter.chill.java.ArraysAsListSerializer
(file:/Users/cjensen/data/tools/apache-maven-3.3.9/m2/com/twitter/chill-java/0.7.6/chill-java-0.7.6.jar)
to field java.util.Arrays$ArrayList.a
WARNING: Please consider reporting this to the maintainers of
com.twitter.chill.java.ArraysAsListSerializer
WARNING: Use --illegal-access=warn to enable warnings of further
illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release

On Tue, Nov 29, 2022 at 3:25 PM Curtis Jensen <cu...@gmail.com> wrote:
>
> Hello,
>
> Using Flink version 1.15.0, I recieve these warnings when trying a
> small example (code below):
> WARNING: An illegal reflective access operation has occurred
> WARNING: Illegal reflective access by
> org.apache.flink.api.java.ClosureCleaner
> (file:/Users/cjensen/data/tools/apache-maven-3.3.9/m2/org/apache/flink/flink-core/1.15.0/flink-core-1.15.0.jar)
> to field java.lang.String.value
> WARNING: Please consider reporting this to the maintainers of
> org.apache.flink.api.java.ClosureCleaner
> WARNING: Use --illegal-access=warn to enable warnings of further
> illegal reflective access operations
> WARNING: All illegal access operations will be denied in a future release
>
> I am undoubtedly doing something incorrectly, but felt that it may be
> useful to take the advice "Please consider reporting this to the
> maintainers of org.apache.flink.api.java.ClosureCleaner".
> Also, any corrections to my example would be appreciated.
>
> Thanks,
> Curtis
>
>
>
>
> AvgAmount.java
>
> import org.apache.flink.api.common.RuntimeExecutionMode;
> import org.apache.flink.api.common.state.ValueState;
> import org.apache.flink.api.common.state.ValueStateDescriptor;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.datastream.KeyedStream;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
> import org.apache.flink.util.Collector;
>
> public class AvgAmount {
>
>   public static void main(String[] args) {
>     StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>     env.setRuntimeMode(RuntimeExecutionMode.BATCH);
>
>     DataStream<ExampleData.PurchaseEvent> purchaseStream =
> env.fromElements(ExampleData.PURCHASE_EVENTS);
>     KeyedStream keyedPurchaseStream = purchaseStream.keyBy(event ->
> event.account_id);
>     keyedPurchaseStream.process(new PurchaseEventProcessor())
>             .map(stats -> stats.toString())
>                     .print();
>   }
>
>   public static class PurchaseStats {
>     public String accountId;
>     public long amountSum;
>     public long amountCount;
>
>     public PurchaseStats(String accountId) {
>       this.accountId = accountId;
>     }
>
>     public void addAmount(long amount) {
>       amountSum += amount;
>       amountCount += 1;
>     }
>
>     @Override
>     public String toString() {
>       return String.format("{\"account_id\":\"%s\",\"avg_amount\":%f}",
> accountId, (double)amountSum/(double)amountCount);
>     }
>   }
>
>   public static class PurchaseEventProcessor extends
> KeyedProcessFunction<String, ExampleData.PurchaseEvent, PurchaseStats>
> {
>     ValueState<PurchaseStats> seen;
>
>     @Override
>     public void open(Configuration parameters) {
>       seen = getRuntimeContext().getState(new
> ValueStateDescriptor<>("seen", PurchaseStats.class));
>     }
>
>     @Override
>     public void processElement(ExampleData.PurchaseEvent
> purchaseEvent, KeyedProcessFunction<String, ExampleData.PurchaseEvent,
> PurchaseStats>.Context context, Collector<PurchaseStats> out) throws
> Exception {
>       PurchaseStats currentStats = seen.value();
>       if (currentStats == null) {
>         currentStats = new PurchaseStats(purchaseEvent.account_id);
>       }
>
>       currentStats.addAmount(purchaseEvent.amount);
>
>       seen.update(currentStats);
>       out.collect(currentStats);
>     }
>   }
> }
>
> ExampleData.java
>
> import org.apache.flink.types.Row;
> import org.apache.flink.types.RowKind;
>
> import java.time.Instant;
>
> public class ExampleData {
>     public static final PurchaseEvent[] PURCHASE_EVENTS =
>             new PurchaseEvent[] {
>                     new PurchaseEvent("1337Gamer", "192.168.0.1", 1000),
>                     new PurchaseEvent("1337", "127.0.0.1", 1000),
>                     new PurchaseEvent("1337", "127.0.0.2", 100),
>                     new PurchaseEvent("1337", "127.0.0.1", 9900)
>             };
>
>     public static class PurchaseEvent {
>         public long timestamp;
>         public String account_id;
>         public String ip;
>         public long amount;
>
>         public PurchaseEvent() { }
>
>         public PurchaseEvent(String accountId, String ip, long amount) {
>             this(Instant.now().getEpochSecond(), accountId, ip, amount);
>         }
>
>         public PurchaseEvent(long timestamp, String accountId, String
> ip, long amount) {
>             this.timestamp = timestamp;
>             this.account_id = accountId;
>             this.ip = ip;
>             this.amount = amount;
>         }
>     }
> }